Spring Cloud Stream:全面学习笔记

Spring Cloud Stream:全面学习笔记

目录

  • Spring Cloud Stream概述
  • 核心概念
  • 环境搭建
  • 快速入门
  • 消息绑定
  • 消息生产者
  • 消息消费者
  • 消息路由
  • 消息分区
  • 错误处理
  • 消息确认
  • 监控管理
  • 实战案例
  • 最佳实践

Spring Cloud Stream概述

Spring Cloud Stream是一个构建消息驱动微服务的框架。它基于Spring Boot来创建独立的、生产级别的Spring应用,并使用Spring Integration来提供与消息代理的连接。

核心特性
声明式编程模型:通过注解简化消息处理
灵活的编程模型:支持函数式编程和注解驱动
多消息中间件支持:RabbitMQ、Apache Kafka、Amazon Kinesis等
自动配置:开箱即用的自动配置
消息转换:自动的内容类型转换
分区支持:消息分区和负载均衡
错误处理:完善的错误处理机制
监控集成:与Spring Boot Actuator集成

应用场景

Producer ServiceMessage BrokerConsumer Service
       ↓               ↓                  ↓
   Order Created   Kafka/RabbitMQ   Inventory Update
   User Registered     Topic/Queue    Email Notification
   Payment Su***ess     Exchange       Audit Logging

事件驱动架构:构建松耦合的微服务系统
异步处理:提高系统响应性能
系统解耦:通过消息中间件解耦服务依赖
数据流处理:实时数据处理和分析

核心概念

绑定器(Binder)
绑定器是Spring Cloud Stream与外部消息系统的桥梁,负责与消息中间件的连接和通信。

ApplicationBindingBinderMessage Broker
     ↑           ↑        ↑           ↑
  @StreamListener  Input   Kafka    Apache Kafka
  @Output         Output  RabbitMQ   RabbitMQ
  Function       Channel  Kinesis    Amazon Kinesis

绑定(Binding)
绑定是应用程序中的输入和输出之间的桥梁,定义了应用程序如何与外部消息系统进行交互。

// 输入绑定
@Input("userInput")
SubscribableChannel userInput();

// 输出绑定
@Output("userOutput")  
MessageChannel userOutput();

通道(Channel)
通道是消息在应用程序内部流转的管道,分为输入通道和输出通道。

MessageChannel:发送消息的通道
SubscribableChannel:接收消息的通道
PollableChannel:可轮询的通道

环境搭建

依赖配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0">
    <modelVersion>4.0.0</modelVersion>
    
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.14</version>
        <relativePath/>
    </parent>
    
    <groupId>***.example</groupId>
    <artifactId>stream-demo</artifactId>
    <version>1.0.0</version>
    <packaging>jar</packaging>
    
    <properties>
        <java.version>11</java.version>
        <spring-cloud.version>2021.0.8</spring-cloud.version>
    </properties>
    
    <dependencies>
        <!-- Spring Cloud Stream -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        
        <!-- 或者使用Kafka -->
        <!--<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>-->
        
        <!-- Web支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        
        <!-- 监控 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        
        <!-- 测试 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
</project>

基础配置

# application.yml
spring:
  application:
    name: stream-demo
  
  cloud:
    stream:
      # 默认绑定器
      default-binder: rabbit
      
      # 绑定配置
      bindings:
        # 输入绑定
        input:
          destination: user-events
          content-type: application/json
          group: user-service-group
          consumer:
            max-attempts: 3
            back-off-initial-interval: 1000
            back-off-multiplier: 2.0
        
        # 输出绑定  
        output:
          destination: user-events
          content-type: application/json
          producer:
            partition-count: 3
            
      # RabbitMQ绑定器配置
      rabbit:
        binder:
          connection-name-prefix: stream-demo
        bindings:
          input:
            consumer:
              auto-bind-dlq: true
              republish-to-dlq: true
              max-attempts: 3
          output:
            producer:
              routing-key-expression: "'user'"
              
      # Kafka绑定器配置(如果使用Kafka)
      kafka:
        binder:
          brokers: localhost:9092
          auto-create-topics: true
          auto-add-partitions: true
        bindings:
          input:
            consumer:
              start-offset: earliest
              enable-dlq: true
          output:
            producer:
              sync: true

# RabbitMQ连接配置
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /

# 监控配置
management:
  endpoints:
    web:
      exposure:
        include: "*"
  endpoint:
    health:
      show-details: always

# 日志配置
logging:
  level:
    org.springframework.cloud.stream: DEBUG
    org.springframework.integration: DEBUG

主启动类

@SpringBootApplication
@EnableBinding({Processor.class, CustomChannels.class})
public class StreamDemoApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(StreamDemoApplication.class, args);
    }
}

快速入门

基础消息处理

// 使用传统注解方式
@***ponent
@Slf4j
public class MessageHandler {
    
    @StreamListener("input")
    public void handleMessage(String message) {
        log.info("接收到消息: {}", message);
    }
    
    @StreamListener("input")
    @SendTo("output")
    public String processMessage(String message) {
        log.info("处理消息: {}", message);
        return "处理结果: " + message.toUpperCase();
    }
}

// 使用函数式编程方式(推荐)
@***ponent
public class MessageProcessor {
    
    @Bean
    public Function<String, String> uppercase() {
        return message -> {
            log.info("转换消息为大写: {}", message);
            return message.toUpperCase();
        };
    }
    
    @Bean
    public Consumer<String> logger() {
        return message -> {
            log.info("记录消息: {}", message);
        };
    }
    
    @Bean
    public Supplier<String> timer() {
        return () -> {
            String timestamp = LocalDateTime.now().toString();
            log.info("生成时间戳: {}", timestamp);
            return timestamp;
        };
    }
}

消息发送

@RestController
@Slf4j
public class MessageController {
    
    @Autowired
    @Qualifier("output")
    private MessageChannel output;
    
    @PostMapping("/send")
    public ResponseEntity<String> sendMessage(@RequestBody String message) {
        boolean sent = output.send(MessageBuilder.withPayload(message).build());
        
        if (sent) {
            log.info("消息发送成功: {}", message);
            return ResponseEntity.ok("消息发送成功");
        } else {
            log.error("消息发送失败: {}", message);
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body("消息发送失败");
        }
    }
}

消息绑定

自定义通道

public interface CustomChannels {
    
    String USER_INPUT = "userInput";
    String USER_OUTPUT = "userOutput";
    String ORDER_INPUT = "orderInput";
    String ORDER_OUTPUT = "orderOutput";
    
    @Input(USER_INPUT)
    SubscribableChannel userInput();
    
    @Output(USER_OUTPUT)
    MessageChannel userOutput();
    
    @Input(ORDER_INPUT)
    SubscribableChannel orderInput();
    
    @Output(ORDER_OUTPUT)
    MessageChannel orderOutput();
}

动态绑定

@***ponent
@Slf4j
public class DynamicBindingService {
    
    @Autowired
    private BindingService bindingService;
    
    @Autowired
    private MessageChannelConfigurer messageChannelConfigurer;
    
    public void createDynamicBinding(String channelName, String destination) {
        // 创建消息通道
        MessageChannel channel = messageChannelConfigurer.configureMessageChannel(channelName);
        
        // 创建绑定配置
        BindingProperties bindingProperties = new BindingProperties();
        bindingProperties.setDestination(destination);
        bindingProperties.setContentType("application/json");
        
        // 创建生产者配置
        ProducerProperties producerProperties = new ProducerProperties();
        producerProperties.setPartitionCount(3);
        bindingProperties.setProducer(producerProperties);
        
        // 绑定通道
        Binding<MessageChannel> binding = bindingService.bindProducer(channel, channelName, bindingProperties, null);
        
        log.info("动态创建绑定成功: channel={}, destination={}", channelName, destination);
    }
    
    public void destroyDynamicBinding(String channelName) {
        bindingService.unbindProducers(channelName);
        log.info("销毁动态绑定: {}", channelName);
    }
}

条件绑定

@Configuration
public class ConditionalBindingConfig {
    
    @Bean
    @ConditionalOnProperty(name = "stream.user.enabled", havingValue = "true")
    @EnableBinding(UserChannels.class)
    public Object enableUserChannels() {
        return new Object();
    }
    
    @Bean
    @ConditionalOnProperty(name = "stream.order.enabled", havingValue = "true")
    @EnableBinding(OrderChannels.class) 
    public Object enableOrderChannels() {
        return new Object();
    }
}

消息生产者

基础生产者

@Service
@Slf4j
public class MessageProducer {
    
    @Autowired
    @Qualifier("output")
    private MessageChannel output;
    
    public void sendMessage(Object payload) {
        Message<Object> message = MessageBuilder
            .withPayload(payload)
            .setHeader("content-type", "application/json")
            .setHeader("timestamp", System.currentTimeMillis())
            .build();
            
        boolean sent = output.send(message);
        log.info("消息发送{}: {}", sent ? "成功" : "失败", payload);
    }
    
    public void sendMessageWithHeaders(Object payload, Map<String, Object> headers) {
        MessageBuilder<Object> builder = MessageBuilder.withPayload(payload);
        
        headers.forEach(builder::setHeader);
        
        Message<Object> message = builder.build();
        output.send(message);
    }
}

批量生产者

@Service
@Slf4j
public class BatchMessageProducer {
    
    @Autowired
    @Qualifier("output")
    private MessageChannel output;
    
    public void sendBatchMessages(List<Object> payloads) {
        payloads.parallelStream().forEach(payload -> {
            Message<Object> message = MessageBuilder
                .withPayload(payload)
                .setHeader("batch-id", UUID.randomUUID().toString())
                .setHeader("batch-size", payloads.size())
                .build();
                
            output.send(message);
        });
        
        log.info("批量发送消息完成,数量: {}", payloads.size());
    }
}

定时生产者

@***ponent
@Slf4j
public class ScheduledProducer {
    
    @Autowired
    private MessageProducer messageProducer;
    
    @Scheduled(fixedRate = 10000) // 每10秒发送一次
    public void sendHeartbeat() {
        HeartbeatMessage heartbeat = HeartbeatMessage.builder()
            .timestamp(Instant.now())
            .serviceId("stream-demo")
            .status("HEALTHY")
            .build();
            
        messageProducer.sendMessage(heartbeat);
        log.info("发送心跳消息: {}", heartbeat);
    }
    
    @Scheduled(cron = "0 0 * * * *") // 每小时发送一次
    public void sendHourlyReport() {
        HourlyReport report = generateHourlyReport();
        messageProducer.sendMessage(report);
        log.info("发送小时报告: {}", report);
    }
    
    private HourlyReport generateHourlyReport() {
        return HourlyReport.builder()
            .timestamp(Instant.now())
            .messageCount(getMessageCount())
            .errorCount(getErrorCount())
            .build();
    }
}

消息消费者

基础消费者

@***ponent
@Slf4j
public class MessageConsumer {
    
    @StreamListener("input")
    public void handleMessage(String message) {
        log.info("处理消息: {}", message);
        
        try {
            // 业务逻辑处理
            processBusinessLogic(message);
            log.info("消息处理成功: {}", message);
        } catch (Exception e) {
            log.error("消息处理失败: {}", message, e);
            throw e; // 重新抛出异常,触发重试机制
        }
    }
    
    @StreamListener("input")
    public void handleUserEvent(@Payload UserEvent event, 
                               @Header Map<String, Object> headers) {
        log.info("处理用户事件: {}, 头信息: {}", event, headers);
        
        switch (event.getType()) {
            case USER_CREATED:
                handleUserCreated(event);
                break;
            case USER_UPDATED:
                handleUserUpdated(event);
                break;
            case USER_DELETED:
                handleUserDeleted(event);
                break;
            default:
                log.warn("未知的用户事件类型: {}", event.getType());
        }
    }
    
    private void processBusinessLogic(String message) {
        // 模拟业务处理
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

条件消费者

@***ponent
@Slf4j
public class ConditionalConsumer {
    
    @StreamListener(value = "input", condition = "headers['event-type']=='user'")
    public void handleUserEvent(String message) {
        log.info("处理用户事件: {}", message);
    }
    
    @StreamListener(value = "input", condition = "headers['event-type']=='order'")
    public void handleOrderEvent(String message) {
        log.info("处理订单事件: {}", message);
    }
    
    @StreamListener(value = "input", condition = "headers['priority']=='high'")
    public void handleHighPriorityEvent(String message) {
        log.info("处理高优先级事件: {}", message);
    }
}

函数式消费者

@***ponent
public class FunctionalConsumer {
    
    @Bean
    public Consumer<UserEvent> userEventHandler() {
        return userEvent -> {
            log.info("函数式处理用户事件: {}", userEvent);
            
            // 根据事件类型处理
            switch (userEvent.getType()) {
                case USER_CREATED:
                    sendWel***eEmail(userEvent.getUserId());
                    break;
                case USER_UPDATED:
                    updateUserCache(userEvent.getUserId());
                    break;
            }
        };
    }
    
    @Bean
    public Function<OrderEvent, OrderProcessResult> orderProcessor() {
        return orderEvent -> {
            log.info("函数式处理订单事件: {}", orderEvent);
            
            // 处理订单逻辑
            OrderProcessResult result = processOrder(orderEvent);
            
            log.info("订单处理结果: {}", result);
            return result;
        };
    }
    
    @Bean
    public Function<Flux<SensorData>, Flux<SensorAlert>> sensorDataProcessor() {
        return sensorDataFlux -> sensorDataFlux
            .window(Duration.ofSeconds(10))
            .flatMap(window -> window
                .collectList()
                .map(this::analyzeData)
                .filter(alert -> alert.getLevel() == AlertLevel.HIGH));
    }
}

消息路由

基于内容的路由

@***ponent
@Slf4j
public class ContentBasedRouter {
    
    @StreamListener("input")
    @SendTo("output")
    public Object routeMessage(@Payload Object message, 
                              @Header Map<String, Object> headers) {
        
        String messageType = (String) headers.get("message-type");
        
        switch (messageType) {
            case "user-event":
                return routeUserEvent(message, headers);
            case "order-event":
                return routeOrderEvent(message, headers);
            case "payment-event":
                return routePaymentEvent(message, headers);
            default:
                log.warn("未知消息类型: {}", messageType);
                return message;
        }
    }
    
    private Object routeUserEvent(Object message, Map<String, Object> headers) {
        headers.put("routing-key", "user.events");
        return MessageBuilder.withPayload(message).copyHeaders(headers).build();
    }
    
    private Object routeOrderEvent(Object message, Map<String, Object> headers) {
        headers.put("routing-key", "order.events");
        return MessageBuilder.withPayload(message).copyHeaders(headers).build();
    }
}

动态路由

@***ponent
@Slf4j
public class DynamicRouter {
    
    @Autowired
    private RoutingRuleService routingRuleService;
    
    @ServiceActivator(inputChannel = "input")
    public void routeMessage(Message<?> message) {
        String payload = (String) message.getPayload();
        Map<String, Object> headers = message.getHeaders();
        
        // 获取路由规则
        List<RoutingRule> rules = routingRuleService.getRoutingRules();
        
        for (RoutingRule rule : rules) {
            if (rule.matches(payload, headers)) {
                String destination = rule.getDestination();
                
                // 动态发送到目标通道
                MessageChannel channel = getChannelByName(destination);
                if (channel != null) {
                    channel.send(message);
                    log.info("消息路由到: {}", destination);
                } else {
                    log.warn("目标通道不存在: {}", destination);
                }
                break;
            }
        }
    }
    
    private MessageChannel getChannelByName(String channelName) {
        // 从Spring上下文获取通道
        return applicationContext.getBean(channelName, MessageChannel.class);
    }
}
@Service
public class RoutingRuleService {
    
    public List<RoutingRule> getRoutingRules() {
        // 从数据库或配置中心获取路由规则
        return Arrays.asList(
            RoutingRule.builder()
                .condition("headers['user-type'] == 'vip'")
                .destination("vip-channel")
                .build(),
            RoutingRule.builder()
                .condition("payload contains 'urgent'")
                .destination("urgent-channel")
                .build()
        );
    }
}

消息分区

分区配置

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: partitioned-topic
          producer:
            partition-count: 4
            partition-key-expression: "headers['user-id']"
        
        input:
          destination: partitioned-topic
          group: partition-consumer-group
          consumer:
            partitioned: true
            instance-count: 4
            instance-index: 0

自定义分区策略

@***ponent
public class CustomPartitionKeyExtractor implements PartitionKeyExtractorStrategy {
    
    @Override
    public Object extractKey(Message<?> message) {
        // 基于用户ID分区
        Object userId = message.getHeaders().get("user-id");
        if (userId != null) {
            return userId;
        }
        
        // 基于消息内容分区
        Object payload = message.getPayload();
        if (payload instanceof UserEvent) {
            UserEvent event = (UserEvent) payload;
            return event.getUserId();
        }
        
        // 默认分区
        return "default";
    }
}
@***ponent
public class CustomPartitionSelector implements PartitionSelectorStrategy {
    
    @Override
    public int selectPartition(Object key, int partitionCount) {
        if (key instanceof String) {
            return Math.abs(key.hashCode()) % partitionCount;
        } else if (key instanceof Long) {
            return (int) ((Long) key % partitionCount);
        }
        
        return 0; // 默认分区
    }
}

分区消费者

@***ponent
@Slf4j
public class PartitionedConsumer {
    
    @StreamListener("input")
    public void handlePartitionedMessage(String message, 
                                       @Header("scst_partition") int partition) {
        log.info("处理分区{}的消息: {}", partition, message);
        
        // 根据分区执行不同的处理逻辑
        switch (partition) {
            case 0:
                handleHighPriorityMessages(message);
                break;
            case 1:
                handleMediumPriorityMessages(message);
                break;
            default:
                handleLowPriorityMessages(message);
        }
    }
    
    private void handleHighPriorityMessages(String message) {
        log.info("处理高优先级消息: {}", message);
    }
    
    private void handleMediumPriorityMessages(String message) {
        log.info("处理中优先级消息: {}", message);
    }
    
    private void handleLowPriorityMessages(String message) {
        log.info("处理低优先级消息: {}", message);
    }
}

错误处理

全局错误处理

@***ponent
@Slf4j
public class GlobalErrorHandler {
    
    @ServiceActivator(inputChannel = "errorChannel")
    public void handleError(ErrorMessage errorMessage) {
        Throwable throwable = errorMessage.getPayload();
        Message<?> failedMessage = (Message<?>) errorMessage.getHeaders().get("originalMessage");
        
        log.error("消息处理失败", throwable);
        
        if (failedMessage != null) {
            log.error("失败的消息: {}", failedMessage.getPayload());
            
            // 根据异常类型进行不同处理
            if (throwable instanceof BusinessException) {
                handleBusinessException((BusinessException) throwable, failedMessage);
            } else if (throwable instanceof SystemException) {
                handleSystemException((SystemException) throwable, failedMessage);
            } else {
                handleGenericException(throwable, failedMessage);
            }
        }
    }
    
    private void handleBusinessException(BusinessException e, Message<?> message) {
        log.warn("业务异常: {}", e.getMessage());
        // 发送到业务异常处理队列
        sendToDLQ(message, "business-error");
    }
    
    private void handleSystemException(SystemException e, Message<?> message) {
        log.error("系统异常: {}", e.getMessage());
        // 发送到系统异常处理队列
        sendToDLQ(message, "system-error");
    }
    
    private void handleGenericException(Throwable e, Message<?> message) {
        log.error("通用异常: {}", e.getMessage());
        // 发送到通用错误处理队列
        sendToDLQ(message, "generic-error");
    }
    
    private void sendToDLQ(Message<?> message, String errorType) {
        // 构建错误消息
        Message<Object> errorMsg = MessageBuilder
            .withPayload(message.getPayload())
            .setHeader("error-type", errorType)
            .setHeader("error-timestamp", Instant.now().toString())
            .setHeader("original-headers", message.getHeaders())
            .build();
        
        // 发送到死信队列
        dlqChannel.send(errorMsg);
    }
}

重试机制

spring:
  cloud:
    stream:
      bindings:
        input:
          consumer:
            max-attempts: 3
            back-off-initial-interval: 1000
            back-off-multiplier: 2.0
            back-off-max-interval: 10000
            retry-template-name: myRetryTemplate
      
      rabbit:
        bindings:
          input:
            consumer:
              auto-bind-dlq: true
              republish-to-dlq: true
              dlq-ttl: 86400000  # 24小时
              dlq-max-length: 1000
@Configuration
public class RetryConfig {
    
    @Bean
    public RetryTemplate myRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        
        // 重试策略
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        retryTemplate.setRetryPolicy(retryPolicy);
        
        // 退避策略
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2.0);
        backOffPolicy.setMaxInterval(10000);
        retryTemplate.setBackOffPolicy(backOffPolicy);
        
        // 重试监听器
        retryTemplate.registerListener(new RetryListenerSupport() {
            @Override
            public <T, E extends Throwable> void onError(RetryContext context, 
                                                        RetryCallback<T, E> callback, 
                                                        Throwable throwable) {
                log.warn("重试执行失败,第{}次尝试: {}", 
                    context.getRetryCount(), throwable.getMessage());
            }
        });
        
        return retryTemplate;
    }
}

自定义错误处理

@***ponent
@Slf4j
public class CustomErrorHandler implements ErrorHandler {
    
    @Autowired
    private ErrorNotificationService errorNotificationService;
    
    @Override
    public void handleError(Throwable t) {
        log.error("处理错误", t);
        
        // 创建错误报告
        ErrorReport errorReport = ErrorReport.builder()
            .timestamp(Instant.now())
            .errorType(t.getClass().getSimpleName())
            .errorMessage(t.getMessage())
            .stackTrace(getStackTrace(t))
            .build();
        
        // 保存错误报告
        saveErrorReport(errorReport);
        
        // 发送告警通知
        if (isCriticalError(t)) {
            errorNotificationService.sendCriticalAlert(errorReport);
        }
    }
    
    private boolean isCriticalError(Throwable t) {
        return t instanceof OutOfMemoryError ||
               t instanceof StackOverflowError ||
               t instanceof NoClassDefFoundError;
    }
    
    private String getStackTrace(Throwable t) {
        StringWriter sw = new StringWriter();
        t.printStackTrace(new PrintWriter(sw));
        return sw.toString();
    }
    
    private void saveErrorReport(ErrorReport errorReport) {
        // 保存到数据库或文件
        log.debug("保存错误报告: {}", errorReport);
    }
}

消息确认

手动确认

spring:
  cloud:
    stream:
      rabbit:
        bindings:
          input:
            consumer:
              acknowledge-mode: manual
              auto-bind-dlq: true
@***ponent
@Slf4j
public class ManualAckConsumer {
    
    @RabbitListener(queues = "user-events")
    public void handleMessage(@Payload String message, 
                             @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
                             Channel channel) throws IOException {
        try {
            log.info("处理消息: {}", message);
            
            // 业务逻辑处理
            processMessage(message);
            
            // 手动确认
            channel.basicAck(deliveryTag, false);
            log.info("消息确认成功: {}", message);
            
        } catch (BusinessException e) {
            log.warn("业务异常,拒绝消息: {}", e.getMessage());
            // 拒绝消息,不重新入队
            channel.basi***ack(deliveryTag, false, false);
            
        } catch (Exception e) {
            log.error("处理异常,拒绝消息并重新入队: {}", e.getMessage());
            // 拒绝消息,重新入队
            channel.basi***ack(deliveryTag, false, true);
        }
    }
    
    private void processMessage(String message) {
        // 模拟业务处理
        if (message.contains("error")) {
            throw new BusinessException("模拟业务异常");
        }
        
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

事务支持

@Configuration
@EnableTransactionManagement
public class TransactionConfig {
    
    @Bean
    public PlatformTransactionManager transactionManager(DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }
    
    @Bean
    public RabbitTransactionManager rabbitTransactionManager(
            ConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }
}

@***ponent
@Slf4j
public class TransactionalConsumer {
    
    @StreamListener("input")
    @Transactional(rollbackFor = Exception.class)
    public void handleTransactionalMessage(String message) {
        log.info("事务性处理消息: {}", message);
        
        try {
            // 数据库操作
            saveToDatabase(message);
            
            // 发送下游消息
            sendDownstreamMessage(message);
            
            log.info("事务性处理完成: {}", message);
            
        } catch (Exception e) {
            log.error("事务性处理失败,回滚: {}", message, e);
            throw e; // 触发事务回滚
        }
    }
    
    private void saveToDatabase(String message) {
        // 数据库保存逻辑
        messageRepository.save(new MessageRecord(message, Instant.now()));
    }
    
    private void sendDownstreamMessage(String message) {
        // 发送下游消息
        downstreamChannel.send(MessageBuilder.withPayload(message).build());
    }
}

监控管理

Actuator端点

management:
  endpoints:
    web:
      exposure:
        include: "*"
  endpoint:
    bindings:
      enabled: true
    channels:
      enabled: true
    health:
      show-details: always

自定义健康检查

@***ponent
public class StreamHealthIndicator implements HealthIndicator {
    
    @Autowired
    private BindingService bindingService;
    
    @Override
    public Health health() {
        try {
            // 检查绑定状态
            Set<String> bindingNames = getBindingNames();
            Map<String, Object> details = new HashMap<>();
            
            boolean allHealthy = true;
            for (String bindingName : bindingNames) {
                boolean isHealthy = checkBindingHealth(bindingName);
                details.put(bindingName, isHealthy ? "UP" : "DOWN");
                if (!isHealthy) {
                    allHealthy = false;
                }
            }
            
            return allHealthy ? Health.up().withDetails(details).build()
                             : Health.down().withDetails(details).build();
            
        } catch (Exception e) {
            return Health.down()
                .withDetail("error", e.getMessage())
                .build();
        }
    }
    
    private Set<String> getBindingNames() {
        // 获取所有绑定名称
        return Arrays.stream(new String[]{"input", "output"})
            .collect(Collectors.toSet());
    }
    
    private boolean checkBindingHealth(String bindingName) {
        // 检查特定绑定的健康状态
        try {
            // 这里可以检查连接状态、队列状态等
            return true;
        } catch (Exception e) {
            return false;
        }
    }
}

Metrics指标

@***ponent
@Slf4j
public class StreamMetrics {
    
    private final MeterRegistry meterRegistry;
    private final Counter messageProcessedCounter;
    private final Counter messageErrorCounter;
    private final Timer messageProcessingTimer;
    
    public StreamMetrics(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.messageProcessedCounter = Counter.builder("stream.messages.processed")
            .description("Total processed messages")
            .register(meterRegistry);
        this.messageErrorCounter = Counter.builder("stream.messages.error")
            .description("Total error messages")
            .register(meterRegistry);
        this.messageProcessingTimer = Timer.builder("stream.message.processing.duration")
            .description("Message processing duration")
            .register(meterRegistry);
    }
    
    @EventListener
    public void handleMessageProcessed(MessageProcessedEvent event) {
        messageProcessedCounter.increment(
            Tags.of(
                "binding", event.getBinding(),
                "type", event.getMessageType()
            )
        );
    }
    
    @EventListener
    public void handleMessageError(MessageErrorEvent event) {
        messageErrorCounter.increment(
            Tags.of(
                "binding", event.getBinding(),
                "error", event.getErrorType()
            )
        );
    }
    
    public void recordProcessingTime(String binding, Duration duration) {
        messageProcessingTimer.record(duration, TimeUnit.MILLISECONDS);
    }
}

实战案例

电商订单处理系统

// 订单事件定义
@Data
@Builder
public class OrderEvent {
    private String orderId;
    private String userId;
    private OrderStatus status;
    private BigDecimal amount;
    private List<OrderItem> items;
    private Instant timestamp;
}

@Data
@Builder
public class OrderItem {
    private String productId;
    private String productName;
    private Integer quantity;
    private BigDecimal price;
}
// 订单服务
@Service
@Slf4j
public class OrderService {
    
    @Autowired
    @Qualifier("orderOutput")
    private MessageChannel orderOutput;
    
    public Order createOrder(CreateOrderRequest request) {
        // 1. 创建订单
        Order order = new Order();
        order.setId(UUID.randomUUID().toString());
        order.setUserId(request.getUserId());
        order.setItems(request.getItems());
        order.setAmount(calculateAmount(request.getItems()));
        order.setStatus(OrderStatus.CREATED);
        order.setCreatedAt(Instant.now());
        
        // 2. 保存订单
        orderRepository.save(order);
        
        // 3. 发布订单创建事件
        OrderEvent event = OrderEvent.builder()
            .orderId(order.getId())
            .userId(order.getUserId())
            .status(OrderStatus.CREATED)
            .amount(order.getAmount())
            .items(convertToOrderItems(request.getItems()))
            .timestamp(Instant.now())
            .build();
        
        publishOrderEvent(event);
        
        return order;
    }
    
    private void publishOrderEvent(OrderEvent event) {
        Message<OrderEvent> message = MessageBuilder
            .withPayload(event)
            .setHeader("event-type", "order")
            .setHeader("order-id", event.getOrderId())
            .setHeader("user-id", event.getUserId())
            .build();
        
        boolean sent = orderOutput.send(message);
        if (sent) {
            log.info("订单事件发布成功: {}", event);
        } else {
            log.error("订单事件发布失败: {}", event);
            throw new RuntimeException("订单事件发布失败");
        }
    }
}
// 库存服务监听器
@***ponent
@Slf4j
public class InventoryEventHandler {
    
    @Autowired
    private InventoryService inventoryService;
    
    @StreamListener(value = "orderInput", 
                   condition = "headers['event-type']=='order' && payload.status.name()=='CREATED'")
    public void handleOrderCreated(OrderEvent orderEvent) {
        log.info("处理订单创建事件,检查库存: {}", orderEvent.getOrderId());
        
        try {
            // 检查库存
            boolean stockAvailable = inventoryService.checkStock(orderEvent.getItems());
            
            if (stockAvailable) {
                // 预留库存
                inventoryService.reserveStock(orderEvent.getOrderId(), orderEvent.getItems());
                
                // 发布库存预留成功事件
                publishInventoryReservedEvent(orderEvent);
                
            } else {
                // 发布库存不足事件
                publishInventoryInsufficientEvent(orderEvent);
            }
            
        } catch (Exception e) {
            log.error("库存检查失败: {}", orderEvent.getOrderId(), e);
            publishInventoryErrorEvent(orderEvent, e.getMessage());
        }
    }
    
    private void publishInventoryReservedEvent(OrderEvent orderEvent) {
        InventoryEvent event = InventoryEvent.builder()
            .orderId(orderEvent.getOrderId())
            .status(InventoryStatus.RESERVED)
            .timestamp(Instant.now())
            .build();
        
        inventoryOutput.send(MessageBuilder.withPayload(event).build());
        log.info("库存预留成功事件已发布: {}", event);
    }
}
// 支付服务监听器
@***ponent
@Slf4j
public class PaymentEventHandler {
    
    @Autowired
    private PaymentService paymentService;
    
    @StreamListener(value = "inventoryInput",
                   condition = "payload.status.name()=='RESERVED'")
    public void handleInventoryReserved(InventoryEvent inventoryEvent) {
        log.info("处理库存预留事件,开始支付: {}", inventoryEvent.getOrderId());
        
        try {
            // 获取订单信息
            Order order = orderService.getOrder(inventoryEvent.getOrderId());
            
            // 处理支付
            PaymentResult result = paymentService.processPayment(
                order.getUserId(), order.getAmount(), order.getId());
            
            if (result.isSu***ess()) {
                publishPaymentSu***essEvent(order, result);
            } else {
                publishPaymentFailedEvent(order, result);
            }
            
        } catch (Exception e) {
            log.error("支付处理失败: {}", inventoryEvent.getOrderId(), e);
            publishPaymentErrorEvent(inventoryEvent.getOrderId(), e.getMessage());
        }
    }
}
// 通知服务监听器
@***ponent
@Slf4j
public class NotificationEventHandler {
    
    @Autowired
    private NotificationService notificationService;
    
    @StreamListener("paymentInput")
    public void handlePaymentEvent(PaymentEvent paymentEvent) {
        log.info("处理支付事件: {}", paymentEvent);
        
        switch (paymentEvent.getStatus()) {
            case SU***ESS:
                sendOrderSu***essNotification(paymentEvent);
                break;
            case FAILED:
                sendOrderFailedNotification(paymentEvent);
                break;
            case ERROR:
                sendPaymentErrorNotification(paymentEvent);
                break;
        }
    }
    
    private void sendOrderSu***essNotification(PaymentEvent event) {
        try {
            // 获取用户信息
            User user = userService.getUser(event.getUserId());
            
            // 发送邮件通知
            notificationService.sendEmail(
                user.getEmail(),
                "订单支付成功",
                String.format("您的订单 %s 已支付成功,金额:%s", 
                    event.getOrderId(), event.getAmount())
            );
            
            // 发送短信通知
            notificationService.sendSms(
                user.getPhone(),
                String.format("订单%s支付成功,金额%s元", 
                    event.getOrderId(), event.getAmount())
            );
            
            log.info("订单成功通知已发送: {}", event.getOrderId());
            
        } catch (Exception e) {
            log.error("发送订单成功通知失败: {}", event.getOrderId(), e);
        }
    }
}

最佳实践

1. 消息设计原则

// 好的消息设计
@Data
@Builder
public class UserEvent {
    private String eventId;          // 事件唯一标识
    private String eventType;        // 事件类型
    private String userId;           // 业务标识
    private Instant timestamp;       // 时间戳
    private String version;          // 版本号
    private Map<String, Object> data; // 业务数据
    private Map<String, String> metadata; // 元数据
}

// 消息版本兼容性
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "version")
@JsonSubTypes({
    @JsonSubTypes.Type(value = UserEventV1.class, name = "v1"),
    @JsonSubTypes.Type(value = UserEventV2.class, name = "v2")
})
public abstract class BaseUserEvent {
    protected String version;
}

2. 幂等性处理

@***ponent
@Slf4j
public class IdempotentMessageHandler {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @StreamListener("input")
    public void handleMessage(@Payload UserEvent event, 
                             @Header Map<String, Object> headers) {
        String messageId = event.getEventId();
        String idempotencyKey = "msg_processed:" + messageId;
        
        // 检查消息是否已处理
        Boolean exists = redisTemplate.hasKey(idempotencyKey);
        if (Boolean.TRUE.equals(exists)) {
            log.info("消息已处理,跳过: {}", messageId);
            return;
        }
        
        try {
            // 处理业务逻辑
            processBusinessLogic(event);
            
            // 标记消息已处理
            redisTemplate.opsForValue().set(idempotencyKey, "processed", 
                Duration.ofHours(24));
            
            log.info("消息处理完成: {}", messageId);
            
        } catch (Exception e) {
            log.error("消息处理失败: {}", messageId, e);
            throw e;
        }
    }
}

3. 消息序列化优化

@Configuration
public class StreamSerializationConfig {
    
    @Bean
    public MessageConverter messageConverter() {
        ***positeMessageConverter converter = new ***positeMessageConverter(
            Arrays.asList(
                new JsonMessageConverter(),
                new AvroMessageConverter(),
                new ProtobufMessageConverter()
            )
        );
        return converter;
    }
    
    @Bean
    public ObjectMapper objectMapper() {
        ObjectMapper mapper = new ObjectMapper();
        mapper.registerModule(new JavaTimeModule());
        mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        mapper.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
        return mapper;
    }
}

4. 性能监控

@***ponent
@Slf4j
public class StreamPerformanceMonitor {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    private final Timer messageProcessingTimer;
    private final Counter messageCounter;
    
    public StreamPerformanceMonitor(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.messageProcessingTimer = Timer.builder("stream.message.processing.time")
            .description("Message processing time")
            .register(meterRegistry);
        this.messageCounter = Counter.builder("stream.message.count")
            .description("Message count")
            .register(meterRegistry);
    }
    
    @EventListener
    public void onMessageReceived(MessageReceivedEvent event) {
        Timer.Sample sample = Timer.start(meterRegistry);
        
        try {
            // 处理消息
            processMessage(event.getMessage());
            
            messageCounter.increment(Tags.of(
                "binding", event.getBinding(),
                "status", "su***ess"
            ));
            
        } catch (Exception e) {
            messageCounter.increment(Tags.of(
                "binding", event.getBinding(),
                "status", "error"
            ));
            throw e;
        } finally {
            sample.stop(messageProcessingTimer);
        }
    }
}

5. 生产环境配置

# 生产环境配置
spring:
  cloud:
    stream:
      default-binder: rabbit
      
      bindings:
        input:
          destination: prod-user-events
          group: user-service-group
          content-type: application/json
          consumer:
            max-attempts: 3
            back-off-initial-interval: 2000
            back-off-multiplier: 2.0
            back-off-max-interval: 30000
            use-native-decoding: true
            
        output:
          destination: prod-user-events
          content-type: application/json
          producer:
            partition-count: 8
            required-groups: user-service-group
            use-native-encoding: true
      
      rabbit:
        binder:
          connection-name-prefix: stream-prod
          admin-addresses: rabbit-cluster:15672
        bindings:
          input:
            consumer:
              auto-bind-dlq: true
              republish-to-dlq: true
              dlq-ttl: 86400000
              max-length: 10000
              prefetch: 100
          output:
            producer:
              delivery-mode: PERSISTENT
              mandatory: true
              confirm-ack-channel: confirmAckChannel

  rabbitmq:
    addresses: rabbit-node1:5672,rabbit-node2:5672,rabbit-node3:5672
    username: ${RABBIT_USERNAME}
    password: ${RABBIT_PASSWORD}
    virtual-host: /prod
    connection-timeout: 30000
    publisher-confirm-type: correlated
    publisher-returns: true
    
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 50
        retry:
          enabled: true
          max-attempts: 3
          initial-interval: 2000
          multiplier: 2.0

总结

Spring Cloud Stream简化了基于消息的微服务构建,通过本文学习,您应该掌握:

核心概念:理解绑定器、绑定、通道等核心概念
消息处理:掌握生产者和消费者的开发模式
高级特性:消息路由、分区、错误处理等
监控运维:健康检查、指标监控、故障排查
实战应用:在真实业务场景中应用Stream

进阶学习建议
深入研究绑定器实现:了解不同消息中间件的特性
性能调优:针对高并发场景进行性能优化
云原生集成:与Kuber***es、Service Mesh集成
事件溯源模式:实现事件驱动的数据架构
CQRS模式:结合***mand Query Responsibility Segregation

Spring Cloud Stream是构建现代微服务架构的重要工具,掌握它对于实现松耦合、可扩展的分布式系统至关重要

转载请说明出处内容投诉
CSS教程网 » Spring Cloud Stream:全面学习笔记

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买