夜航星
Lila's dream
Lila's dream
Published on 2025-07-20 / 3 Visits
0
0

SSE 聊天功能的 Spring 生命周期详解

#AI

一、概述

本文档详细说明 java ai对话聊天模块中聊天 SSE(Server-Sent Events)功能背后 Spring 框架的核心机制,包括:

  • Spring WebFlux 如何处理响应式流

  • 核心类如何注入到 IOC 容器

  • 不同聊天类型的对话如何通过策略模式切换

  • 完整的请求处理生命周期


二、技术栈与依赖

2.1 核心依赖

<!-- Spring WebFlux 响应式 Web 框架 -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-webflux</artifactId>
</dependency>

<!-- Project Reactor 响应式编程库 -->
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
</dependency>

2.2 关键注解

  • @SpringBootApplication: 启用自动配置和组件扫描

  • @RestController: 标记 REST 控制器

  • @Component: 标记组件,自动注册到 IOC 容器

  • @RequiredArgsConstructor: Lombok 生成构造函数注入


三、Spring Boot 启动过程

3.1 启动入口

@SpringBootApplication
public class PigxKnowledgeApplication {
    public static void main(String[] args) {
        SpringApplication.run(PigxKnowledgeApplication.class, args);
    }
}

3.2 启动阶段的关键步骤

阶段 1: 应用上下文初始化

  1. 创建 ApplicationContext

    • Spring Boot 创建 AnnotationConfigServletWebServerApplicationContext(如果使用 WebFlux,则是 AnnotationConfigReactiveWebServerApplicationContext

    • 但由于项目同时引入了 spring-webflux,Spring Boot 会检测并启用 WebFlux 支持

  2. 组件扫描(Component Scanning)

    // Spring 扫描 @Component、@Service、@Controller、@RestController 等注解
    // 扫描路径:com.pig4cloud.pigx.knowledge 及其子包
    

    扫描到的关键类:

    • AiChatController (@RestController)

    • AiChatServiceImpl (@Service)

    • SimpleChatRule (@Component("simpleChat")) // 简单聊天

    • VectorChatRule (@Component("vectorChat")) // 知识库聊天

    • McpChatRule (@Component("mcpChat")) // mcp聊天

    • JsonChatRule (@Component("jsonChat")) // json聊天

    • DatabaseChatRule (@Component("databaseChat")) // 数据库聊天

    • 等等...

阶段 2: Bean 定义注册

Spring 为每个扫描到的组件创建 BeanDefinition

// 伪代码展示 Spring 内部处理
BeanDefinition aiChatControllerDef = new BeanDefinition();
aiChatControllerDef.setBeanClass(AiChatController.class);
aiChatControllerDef.setScope(Singleton);
// 注册到 BeanDefinitionRegistry

BeanDefinition simpleChatRuleDef = new BeanDefinition();
simpleChatRuleDef.setBeanClass(SimpleChatRule.class);
simpleChatRuleDef.setScope(Singleton);
simpleChatRuleDef.setBeanName("simpleChat"); // @Component("simpleChat")

阶段 3: Bean 实例化与依赖注入

3.3.1 ChatRule 实现类的注册

所有 ChatRule 实现类通过 @Component 注解自动注册:

@Component("simpleChat")  // Bean 名称为 "simpleChat"
public class SimpleChatRule implements ChatRule { ... }

@Component("vectorChat")  // Bean 名称为 "vectorChat"
public class VectorChatRule implements ChatRule { ... }

@Component("mcpChat")  // Bean 名称为 "mcpChat"
public class McpChatRule implements ChatRule { ... }

3.3.2 Map<String, ChatRule> 的自动注入

Spring 的特殊机制:当构造函数参数类型为 Map<String, Interface> 时,Spring 会自动收集所有该接口的实现类,以 bean 名称作为 key 注入:

@Service
@RequiredArgsConstructor  // 生成构造函数
public class AiChatServiceImpl implements ChatService {
    
    // Spring 自动注入:收集所有 ChatRule 实现类
    // key = bean 名称(@Component 的 value)
    // value = ChatRule 实例
    private final Map<String, ChatRule> chatRuleMap;
    
    // 等价于:
    // public AiChatServiceImpl(Map<String, ChatRule> chatRuleMap) {
    //     this.chatRuleMap = chatRuleMap;
    // }
}

Spring 内部处理逻辑(伪代码):

// Spring 容器初始化时
Map<String, ChatRule> chatRuleMap = new HashMap<>();

// 查找所有 ChatRule 类型的 bean
for (String beanName : beanFactory.getBeanNamesForType(ChatRule.class)) {
    ChatRule rule = beanFactory.getBean(beanName, ChatRule.class);
    chatRuleMap.put(beanName, rule);
}

// 注入到 AiChatServiceImpl
aiChatServiceImpl.setChatRuleMap(chatRuleMap);

最终注入结果:

chatRuleMap = {
    "simpleChat" -> SimpleChatRule 实例,
    "vectorChat" -> VectorChatRule 实例,
    "mcpChat" -> McpChatRule 实例,
    "jsonChat" -> JsonChatRule 实例,
    "databaseChat" -> DatabaseChatRule 实例,
    ...
}

3.3.3 其他核心 Bean 的注入

@Service
@RequiredArgsConstructor
public class AiChatServiceImpl implements ChatService {
    
    // Optional 类型:如果 LiteFlow 未配置,则为空
    private final Optional<FlowExecutor> flowExecutorOptional;
    
    // MyBatis Mapper:通过 @Mapper 扫描注册
    private final AiDatasetMapper datasetMapper;
    private final AiChatRecordMapper recordMapper;
    
    // Spring AI 配置的 Bean
    private final ChatMemoryProvider chatMemoryProvider;
    private final MessageWindowChatMemory messageWindowChatMemory;
}

阶段 4: WebFlux 自动配置

Spring Boot 检测到 spring-webflux 依赖后,自动配置:

  1. ReactiveWebServerFactory

    • 默认使用 Netty(如果 classpath 中有 reactor-netty

    • 或使用 Tomcat/Undertow(如果配置了相应依赖)

  2. WebFlux 配置类

    • WebFluxAutoConfiguration: 配置 WebFluxConfigurer

    • HttpHandlerAutoConfiguration: 配置 HTTP 处理器

    • ReactiveWebServerFactoryAutoConfiguration: 配置响应式 Web 服务器

  3. 消息编解码器(Codec)

    // Spring 自动注册的编解码器
    - ServerCodecConfigurer: 配置 HTTP 消息编解码
    - ServerSentEventHttpMessageWriter: 处理 SSE 格式
    - Jackson2JsonEncoder: JSON 编码
    

阶段 5: 控制器映射注册

Spring 扫描 @RestController 并注册路由:

@RestController
@RequestMapping("/chat")
public class AiChatController {
    
    @RequestMapping(value = "/msg/list", 
                   produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<AiMessageResultDTO> msg(@RequestParam Long key) {
        return chatService.chatList(key);
    }
}

Spring 内部注册(伪代码):

// HandlerMapping 注册
RequestMappingInfo mappingInfo = RequestMappingInfo
    .paths("/chat/msg/list")
    .methods(RequestMethod.POST, RequestMethod.GET)
    .produces(MediaType.TEXT_EVENT_STREAM_VALUE)
    .build();

HandlerMethod handlerMethod = new HandlerMethod(
    aiChatController,  // Controller 实例
    AiChatController.class.getMethod("msg", Long.class)
);

handlerMapping.register(mappingInfo, handlerMethod);

四、请求处理生命周期

4.1 HTTP 请求到达

客户端请求: GET /chat/msg/list?key=123
Accept: text/event-stream

4.2 WebFlux 请求处理链

步骤 1: Netty 接收请求

// Netty 接收 HTTP 请求
HttpServerRequest request = ...;
HttpServerResponse response = ...;

步骤 2: 路由匹配

// DispatcherHandler 查找匹配的 Handler
HandlerMapping handlerMapping = ...;
HandlerMethod handlerMethod = handlerMapping.getHandler(request);

// 匹配到: AiChatController.msg()

步骤 3: 参数解析

// HandlerMethodArgumentResolver 解析参数
@RequestParam Long key -> 从请求参数中提取 key = 123

步骤 4: 控制器方法调用

// 调用控制器方法
Flux<AiMessageResultDTO> result = aiChatController.msg(123L);

调用链:

AiChatController.msg(123L)
  -> chatService.chatList(123L)  // AiChatServiceImpl
    -> 查询数据库获取聊天记录
    -> 风险检测(可选)
    -> chatRuleMap.get(chatType).process(chatMessageDTO)
      -> SimpleChatRule.process() / VectorChatRule.process() / ...
        -> 调用 AI 模型 API
        -> 返回 Flux<AiMessageResultDTO>

4.3 Flux 流的处理

4.3.1 Flux 创建

// SimpleChatRule.process() 示例
public Flux<AiMessageResultDTO> process(ChatMessageDTO chatMessageDTO) {
    // 1. 获取 AI 流式助手
    AiStreamAssistantService assistant = modelProvider
        .getAiStreamAssistant(chatMessageDTO.getModelName())
        .getValue();
    
    // 2. 调用 AI 模型,返回 TokenStream
    TokenStream tokenStream = assistant.chatTokenStream(...);
    
    // 3. 创建 Flux
    return Flux.create(emitter -> {
        tokenStream
            .onPartialResponse(partialResponse -> {
                AiMessageResultDTO result = new AiMessageResultDTO();
                result.setMessage(partialResponse);
                emitter.next(result);  // 推送数据到流
            })
            .onCompleteResponse(completeResponse -> {
                emitter.complete();  // 完成流
            })
            .onError(error -> {
                emitter.error(error);  // 错误处理
            })
            .start();
    });
}

4.3.2 Flux 转换为 SSE

Spring WebFlux 的自动转换机制:

  1. 检测返回类型

    // Spring 检测到方法返回类型为 Flux
    ReturnValueHandler returnValueHandler = ...;
    // 使用 ResponseBodyResultHandler
    
  2. 检测 Content-Type

    // produces = MediaType.TEXT_EVENT_STREAM_VALUE
    // Spring 识别为 SSE 格式
    
  3. 选择消息写入器

    // ServerSentEventHttpMessageWriter
    // 负责将 Flux 转换为 SSE 格式
    
  4. SSE 格式转换

    SSE 协议格式:

    data: {"message":"Hello","finish":false}
    
    data: {"message":"World","finish":false}
    
    data: {"message":"","finish":true}
    
    

    Spring 内部转换(伪代码):

    ServerSentEventHttpMessageWriter writer = ...;
    
    Flux<AiMessageResultDTO> flux = ...;
    
    // 将每个元素转换为 SSE 事件
    Flux<ServerSentEvent<AiMessageResultDTO>> sseFlux = flux.map(data -> {
        return ServerSentEvent.builder()
            .data(data)
            .build();
    });
    
    // 写入 HTTP 响应
    response.writeWith(sseFlux.map(event -> {
        // 格式化为 SSE 文本格式
        String sseText = formatAsSSE(event);
        return dataBufferFactory.wrap(sseText.getBytes());
    }));
    

4.3.3 流式传输

// 响应式流式传输
Flux<DataBuffer> responseFlux = flux
    .map(dto -> {
        // 序列化为 JSON
        String json = objectMapper.writeValueAsString(dto);
        // 格式化为 SSE 格式
        String sse = "data: " + json + "\n\n";
        // 转换为 DataBuffer
        return dataBufferFactory.wrap(sse.getBytes());
    })
    .doOnNext(buffer -> {
        // 立即写入响应(非阻塞)
        response.writeAndFlush(buffer);
    });

关键特性:

  • 非阻塞: 使用 Netty 的异步 I/O

  • 背压(Backpressure): Reactor 自动处理流控

  • 流式传输: 数据产生即发送,无需等待全部完成

4.4 完整请求流程图

┌─────────────────────────────────────────────────────────────┐
│ 1. 客户端请求                                                │
│    GET /chat/msg/list?key=123                              │
│    Accept: text/event-stream                               │
└───────────────────────┬─────────────────────────────────────┘
                        │
                        ↓
┌─────────────────────────────────────────────────────────────┐
│ 2. Netty 接收请求                                            │
│    - HttpServerRequest                                       │
│    - HttpServerResponse                                      │
└───────────────────────┬─────────────────────────────────────┘
                        │
                        ↓
┌─────────────────────────────────────────────────────────────┐
│ 3. DispatcherHandler 路由匹配                                │
│    - HandlerMapping.getHandler()                            │
│    - 匹配到: AiChatController.msg()                          │
└───────────────────────┬─────────────────────────────────────┘
                        │
                        ↓
┌─────────────────────────────────────────────────────────────┐
│ 4. 参数解析                                                  │
│    - @RequestParam Long key = 123                           │
└───────────────────────┬─────────────────────────────────────┘
                        │
                        ↓
┌─────────────────────────────────────────────────────────────┐
│ 5. 控制器方法调用                                            │
│    aiChatController.msg(123L)                               │
│      ↓                                                       │
│    chatService.chatList(123L)                               │
│      ↓                                                       │
│    - 查询数据库                                              │
│    - 风险检测(可选)                                        │
│    - chatRuleMap.get(type).process()                        │
│      ↓                                                       │
│    - 调用 AI 模型 API                                        │
│    - 返回 Flux<AiMessageResultDTO>                          │
└───────────────────────┬─────────────────────────────────────┘
                        │
                        ↓
┌─────────────────────────────────────────────────────────────┐
│ 6. ResponseBodyResultHandler 处理返回值                       │
│    - 检测返回类型: Flux<AiMessageResultDTO>                  │
│    - 检测 Content-Type: text/event-stream                    │
│    - 选择: ServerSentEventHttpMessageWriter                  │
└───────────────────────┬─────────────────────────────────────┘
                        │
                        ↓
┌─────────────────────────────────────────────────────────────┐
│ 7. SSE 格式转换                                              │
│    Flux<AiMessageResultDTO>                                  │
│      ↓ map()                                                 │
│    Flux<ServerSentEvent<AiMessageResultDTO>>                 │
│      ↓ map()                                                 │
│    Flux<DataBuffer>  (SSE 格式文本)                          │
└───────────────────────┬─────────────────────────────────────┘
                        │
                        ↓
┌─────────────────────────────────────────────────────────────┐
│ 8. 流式写入响应                                              │
│    - 每个元素立即写入(非阻塞)                              │
│    - 格式: data: {...}\n\n                                  │
│    - 客户端实时接收                                          │
└───────────────────────┬─────────────────────────────────────┘
                        │
                        ↓
┌─────────────────────────────────────────────────────────────┐
│ 9. 客户端接收 SSE 流                                         │
│    - EventSource API                                        │
│    - 实时显示 AI 响应                                        │
└─────────────────────────────────────────────────────────────┘

五、核心类注入到 IOC 容器的完整过程

5.1 组件扫描阶段

// Spring Boot 启动时
@ComponentScan(basePackages = "com.pig4cloud.pigx.knowledge")

扫描到的类:

  1. Controller 层

    @RestController  // -> Bean: aiChatController
    public class AiChatController { ... }
    
  2. Service 层

    @Service  // -> Bean: aiChatServiceImpl
    public class AiChatServiceImpl implements ChatService { ... }
    
  3. Rule 层(策略模式)

    @Component("simpleChat")  // -> Bean: simpleChat
    public class SimpleChatRule implements ChatRule { ... }
    
    @Component("vectorChat")  // -> Bean: vectorChat
    public class VectorChatRule implements ChatRule { ... }
    
    @Component("mcpChat")  // -> Bean: mcpChat
    public class McpChatRule implements ChatRule { ... }
    

5.2 Bean 定义注册

// Spring 内部处理(简化版)
BeanDefinitionRegistry registry = ...;

// 1. 注册 Controller
registry.registerBeanDefinition("aiChatController", 
    new RootBeanDefinition(AiChatController.class));

// 2. 注册 Service
registry.registerBeanDefinition("aiChatServiceImpl",
    new RootBeanDefinition(AiChatServiceImpl.class));

// 3. 注册所有 ChatRule 实现
registry.registerBeanDefinition("simpleChat",
    new RootBeanDefinition(SimpleChatRule.class));
registry.registerBeanDefinition("vectorChat",
    new RootBeanDefinition(VectorChatRule.class));
// ... 其他 Rule

5.3 Bean 实例化顺序

依赖关系:

AiChatController
  └─> ChatService (AiChatServiceImpl)
        ├─> Map<String, ChatRule> (需要所有 ChatRule 实例)
        ├─> AiChatRecordMapper
        ├─> AiDatasetMapper
        └─> ChatMemoryProvider

实例化顺序:

  1. 第一步:实例化 ChatRule 实现类

    // Spring 实例化所有 ChatRule(无依赖或依赖已满足)
    SimpleChatRule simpleChatRule = new SimpleChatRule(...);
    VectorChatRule vectorChatRule = new VectorChatRule(...);
    McpChatRule mcpChatRule = new McpChatRule(...);
    // ... 注册到容器
    
  2. 第二步:收集 ChatRule 到 Map

    // Spring 自动收集(在注入 AiChatServiceImpl 时)
    Map<String, ChatRule> chatRuleMap = new HashMap<>();
    chatRuleMap.put("simpleChat", simpleChatRule);
    chatRuleMap.put("vectorChat", vectorChatRule);
    chatRuleMap.put("mcpChat", mcpChatRule);
    // ...
    
  3. 第三步:实例化 Service

    // Spring 调用构造函数注入
    AiChatServiceImpl service = new AiChatServiceImpl(
        flowExecutorOptional,      // Optional<FlowExecutor>
        chatRuleMap,               // Map<String, ChatRule> - 自动收集
        datasetMapper,             // MyBatis Mapper
        recordMapper,              // MyBatis Mapper
        chatMemoryProvider,        // ChatMemoryProvider
        messageWindowChatMemory    // MessageWindowChatMemory
    );
    
  4. 第四步:实例化 Controller

    // Spring 调用构造函数注入
    AiChatController controller = new AiChatController(
        chatService,           // AiChatServiceImpl
        generateService,       // AiGenerateService
        chartGenerateService   // AiChartGenerateService
    );
    

5.4 Map<String, ChatRule> 注入机制详解

Spring 的特殊注入规则:

当构造函数参数类型为 Map<String, Interface> 时:

  1. 查找所有实现该接口的 Bean

    // Spring 内部逻辑(简化)
    Class<?> interfaceType = ChatRule.class;
    Map<String, ChatRule> beans = new LinkedHashMap<>();
    
    // 获取所有 ChatRule 类型的 bean 名称
    String[] beanNames = beanFactory.getBeanNamesForType(interfaceType);
    
    for (String beanName : beanNames) {
        ChatRule bean = beanFactory.getBean(beanName, interfaceType);
        beans.put(beanName, bean);  // key = bean 名称
    }
    
  2. Bean 名称来源

    • @Component("simpleChat") -> key = "simpleChat"

    • @Component("vectorChat") -> key = "vectorChat"

    • 如果没有指定名称,使用类名首字母小写:SimpleChatRule -> "simpleChatRule"

  3. 注入到目标 Bean

    // Spring 调用构造函数时传入收集好的 Map
    public AiChatServiceImpl(Map<String, ChatRule> chatRuleMap) {
        this.chatRuleMap = chatRuleMap;
    }
    

六、启动时的关键初始化

6.1 应用上下文刷新

// SpringApplication.run() 内部
ConfigurableApplicationContext context = ...;
context.refresh();  // 关键方法

refresh() 执行流程:

  1. prepareRefresh(): 准备刷新

  2. obtainFreshBeanFactory(): 获取 BeanFactory

  3. prepareBeanFactory(): 准备 BeanFactory

  4. postProcessBeanFactory(): 后处理 BeanFactory

  5. invokeBeanFactoryPostProcessors(): 执行 BeanFactoryPostProcessor

  6. registerBeanPostProcessors(): 注册 BeanPostProcessor

  7. initMessageSource(): 初始化消息源

  8. initApplicationEventMulticaster(): 初始化事件广播器

  9. onRefresh(): 子类刷新(Web 应用创建 WebServer)

  10. registerListeners(): 注册监听器

  11. finishBeanFactoryInitialization(): 完成 Bean 初始化

  12. finishRefresh(): 完成刷新

6.2 WebFlux 服务器启动

onRefresh() 阶段:

// ReactiveWebServerApplicationContext.onRefresh()
@Override
protected void onRefresh() {
    super.onRefresh();
    try {
        createWebServer();  // 创建 Netty 服务器
    } catch (Throwable ex) {
        throw new ApplicationContextException(...);
    }
}

protected void createWebServer() {
    // 获取 ReactiveWebServerFactory(Netty)
    ReactiveWebServerFactory factory = getWebServerFactory();
    
    // 创建 HttpHandler(处理 HTTP 请求)
    this.webServer = factory.getWebServer(getHttpHandler());
    
    // 启动服务器
    this.webServer.start();
}

Netty 服务器启动:

// ReactorNettyWebServer.start()
@Override
public void start() throws WebServerException {
    if (this.nettyContext == null) {
        this.nettyContext = startHttpServer();
    }
}

private NettyContext startHttpServer() {
    HttpServer server = HttpServer.create()
        .port(this.port)
        .handle(this.httpHandler);  // WebFlux 的 HttpHandler
    
    return server.bindNow();  // 绑定端口,开始监听
}

6.3 HandlerMapping 注册

finishBeanFactoryInitialization() 阶段:

// RequestMappingHandlerMapping 扫描 @RequestMapping
@Override
protected void initHandlerMethods() {
    for (String beanName : getCandidateBeanNames()) {
        if (isHandler(getBeanType(beanName))) {
            detectHandlerMethods(beanName);  // 检测处理方法
        }
    }
}

// 检测到 AiChatController
protected void detectHandlerMethods(Object handler) {
    Class<?> handlerType = handler.getClass();
    
    // 扫描所有方法
    Map<Method, RequestMappingInfo> methods = MethodIntrospector
        .selectMethods(handlerType, 
            (Method method) -> {
                RequestMapping requestMapping = 
                    AnnotatedElementUtils.findMergedAnnotation(
                        method, RequestMapping.class);
                return requestMapping != null ? 
                    createRequestMappingInfo(requestMapping) : null;
            });
    
    // 注册方法映射
    methods.forEach((method, mapping) -> {
        Method invocableMethod = AopUtils.selectInvocableMethod(
            method, handlerType);
        registerHandlerMethod(handler, invocableMethod, mapping);
    });
}

注册结果:

// HandlerMapping 内部存储
Map<RequestMappingInfo, HandlerMethod> handlerMethods = {
    RequestMappingInfo(
        paths = ["/chat/msg/list"],
        methods = [GET, POST],
        produces = ["text/event-stream"]
    ) -> HandlerMethod(
        bean = aiChatController,
        method = AiChatController.msg(Long)
    )
}

七、运行时请求处理详细流程

7.1 请求到达 Netty

// Netty 接收连接
Channel channel = ...;
HttpServerRequest request = ...;
HttpServerResponse response = ...;

// 转换为 Spring WebFlux 的 ServerHttpRequest
ServerHttpRequest serverRequest = new ReactorServerHttpRequest(request, bufferFactory);
ServerHttpResponse serverResponse = new ReactorServerHttpResponse(response, bufferFactory);

// 创建 ServerWebExchange
ServerWebExchange exchange = new DefaultServerWebExchange(
    serverRequest, serverResponse, handlerMapping, ...);

7.2 DispatcherHandler 处理

// DispatcherHandler.handle()
public Mono<Void> handle(ServerWebExchange exchange) {
    return Flux.fromIterable(this.handlerMappings)
        .concatMap(mapping -> mapping.getHandler(exchange))
        .next()
        .switchIfEmpty(createNotFoundError())
        .flatMap(handler -> invokeHandler(exchange, handler))
        .flatMap(result -> handleResult(exchange, result));
}

步骤分解:

  1. 查找 Handler

    // RequestMappingHandlerMapping.getHandler()
    HandlerMethod handlerMethod = this.mappingRegistry
        .getMappingsByUrl(exchange.getRequest().getPath().value())
        .stream()
        .filter(mapping -> mapping.matches(exchange))
        .findFirst()
        .map(mapping -> mapping.getHandlerMethod())
        .orElse(null);
    
    // 找到: AiChatController.msg()
    
  2. 调用 Handler

    // InvocableHandlerMethod.invoke()
    Object[] args = resolveHandlerMethodArguments(
        handlerMethod, exchange, ...);
    
    // 解析参数: @RequestParam Long key = 123
    Object result = handlerMethod.invoke(aiChatController, args);
    
    // 返回: Flux<AiMessageResultDTO>
    
  3. 处理返回值

    // ResponseBodyResultHandler.handleResult()
    if (result instanceof Publisher) {
        // 检测 Content-Type
        MediaType mediaType = exchange.getResponse()
            .getHeaders()
            .getContentType();
        
        if (mediaType.equals(MediaType.TEXT_EVENT_STREAM)) {
            // 使用 ServerSentEventHttpMessageWriter
            return writeWithMessageWriter(
                exchange, 
                result,  // Flux<AiMessageResultDTO>
                ServerSentEventHttpMessageWriter.class
            );
        }
    }
    

7.3 SSE 消息写入

// ServerSentEventHttpMessageWriter.write()
public Mono<Void> write(Publisher<?> inputStream, 
                       ResolvableType elementType,
                       MediaType mediaType,
                       ServerHttpResponse response,
                       Map<String, Object> hints) {
    
    // 设置响应头
    response.getHeaders().setContentType(
        new MediaType("text", "event-stream", StandardCharsets.UTF_8));
    response.getHeaders().set("Cache-Control", "no-cache");
    response.getHeaders().set("Connection", "keep-alive");
    
    // 转换 Flux
    Flux<DataBuffer> body = Flux.from(inputStream)
        .map(data -> {
            // 序列化为 JSON
            String json = objectMapper.writeValueAsString(data);
            
            // 格式化为 SSE 格式
            StringBuilder sb = new StringBuilder();
            sb.append("data: ").append(json).append("\n\n");
            
            // 转换为 DataBuffer
            return response.bufferFactory()
                .wrap(sb.toString().getBytes(StandardCharsets.UTF_8));
        });
    
    // 写入响应(流式)
    return response.writeWith(body);
}

7.4 流式传输到客户端

// Netty 异步写入
response.writeAndFlush(dataBuffer)
    .addListener(future -> {
        if (future.isSuccess()) {
            // 继续写入下一个数据
        } else {
            // 处理错误
        }
    });

客户端接收:

// 浏览器端
const eventSource = new EventSource('/chat/msg/list?key=123');

eventSource.onmessage = (event) => {
    const data = JSON.parse(event.data);
    // 实时显示: data.message
    console.log(data.message);
};

八、关键配置类与自动配置

8.1 WebFlux 自动配置

配置类:WebFluxAutoConfiguration

@Configuration
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
@AutoConfigureAfter({ReactiveWebServerFactoryAutoConfiguration.class})
public class WebFluxAutoConfiguration {
    
    @Bean
    @ConditionalOnMissingBean
    public DispatcherHandler webHandler(ApplicationContext applicationContext) {
        return new DispatcherHandler(applicationContext);
    }
    
    @Bean
    @ConditionalOnMissingBean
    public RequestMappingHandlerMapping requestMappingHandlerMapping(...) {
        return new RequestMappingHandlerMapping(...);
    }
    
    @Bean
    @ConditionalOnMissingBean
    public ResponseBodyResultHandler responseBodyResultHandler(...) {
        return new ResponseBodyResultHandler(...);
    }
}

8.2 消息编解码器配置

配置类:WebFluxCodecAutoConfiguration

@Configuration
@ConditionalOnClass({CodecConfigurer.class})
public class WebFluxCodecAutoConfiguration {
    
    @Bean
    @ConditionalOnMissingBean
    public ServerCodecConfigurer serverCodecConfigurer() {
        ServerCodecConfigurer configurer = new DefaultServerCodecConfigurer();
        
        // 注册 SSE 消息写入器
        configurer.writers().add(
            new ServerSentEventHttpMessageWriter(
                new Jackson2JsonEncoder()
            )
        );
        
        return configurer;
    }
}

九、总结

9.1 核心机制总结

  1. 组件扫描与注册

    • Spring Boot 启动时扫描所有 @Component 注解的类

    • 自动注册到 IOC 容器

  2. Map 注入机制

    • Spring 自动收集接口的所有实现类

    • 以 bean 名称作为 key 注入到 Map<String, Interface>

  3. WebFlux 响应式处理

    • 使用 Netty 作为底层服务器

    • 非阻塞异步处理请求

  4. Flux 转 SSE

    • Spring 自动检测 produces = TEXT_EVENT_STREAM

    • 使用 ServerSentEventHttpMessageWriter 转换格式

    • 流式传输,实时推送数据

9.2 完整生命周期时间线

启动阶段:
  T0: SpringApplication.run()
  T1: 创建 ApplicationContext
  T2: 组件扫描 (@Component, @Service, @RestController)
  T3: Bean 定义注册
  T4: Bean 实例化(按依赖顺序)
  T5: 依赖注入(包括 Map<String, ChatRule>)
  T6: WebFlux 自动配置
  T7: Netty 服务器启动
  T8: HandlerMapping 注册
  T9: 应用就绪

运行时:
  T10: HTTP 请求到达 Netty
  T11: DispatcherHandler 路由匹配
  T12: 参数解析
  T13: 控制器方法调用
  T14: Service 层处理
  T15: ChatRule 策略执行
  T16: AI 模型 API 调用
  T17: Flux 流创建
  T18: SSE 格式转换
  T19: 流式写入响应
  T20: 客户端实时接收

9.3 关键技术点

  • 响应式编程: Project Reactor (Flux/Mono)

  • 非阻塞 I/O: Netty

  • 流式传输: Server-Sent Events (SSE)

  • 依赖注入: Spring IOC 容器

  • 策略模式: ChatRule 接口 + 多个实现

  • 自动配置: Spring Boot AutoConfiguration


十、参考资料

  • Spring WebFlux 官方文档

  • Project Reactor 官方文档

  • Server-Sent Events (SSE) 规范

  • Spring Boot 自动配置原理


Comment