目录
- Spring Cloud Stream概述
- 核心概念
- 环境搭建
- 快速入门
- 消息绑定
- 消息生产者
- 消息消费者
- 消息路由
- 消息分区
- 错误处理
- 消息确认
- 监控管理
- 实战案例
- 最佳实践
Spring Cloud Stream概述
Spring Cloud Stream是一个构建消息驱动微服务的框架。它基于Spring Boot来创建独立的、生产级别的Spring应用,并使用Spring Integration来提供与消息代理的连接。
核心特性
声明式编程模型:通过注解简化消息处理
灵活的编程模型:支持函数式编程和注解驱动
多消息中间件支持:RabbitMQ、Apache Kafka、Amazon Kinesis等
自动配置:开箱即用的自动配置
消息转换:自动的内容类型转换
分区支持:消息分区和负载均衡
错误处理:完善的错误处理机制
监控集成:与Spring Boot Actuator集成
应用场景
Producer Service → Message Broker → Consumer Service
↓ ↓ ↓
Order Created Kafka/RabbitMQ Inventory Update
User Registered Topic/Queue Email Notification
Payment Su***ess Exchange Audit Logging
事件驱动架构:构建松耦合的微服务系统
异步处理:提高系统响应性能
系统解耦:通过消息中间件解耦服务依赖
数据流处理:实时数据处理和分析
核心概念
绑定器(Binder)
绑定器是Spring Cloud Stream与外部消息系统的桥梁,负责与消息中间件的连接和通信。
Application → Binding → Binder → Message Broker
↑ ↑ ↑ ↑
@StreamListener Input Kafka Apache Kafka
@Output Output RabbitMQ RabbitMQ
Function Channel Kinesis Amazon Kinesis
绑定(Binding)
绑定是应用程序中的输入和输出之间的桥梁,定义了应用程序如何与外部消息系统进行交互。
// 输入绑定
@Input("userInput")
SubscribableChannel userInput();
// 输出绑定
@Output("userOutput")
MessageChannel userOutput();
通道(Channel)
通道是消息在应用程序内部流转的管道,分为输入通道和输出通道。
MessageChannel:发送消息的通道
SubscribableChannel:接收消息的通道
PollableChannel:可轮询的通道
环境搭建
依赖配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.14</version>
<relativePath/>
</parent>
<groupId>***.example</groupId>
<artifactId>stream-demo</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<properties>
<java.version>11</java.version>
<spring-cloud.version>2021.0.8</spring-cloud.version>
</properties>
<dependencies>
<!-- Spring Cloud Stream -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!-- 或者使用Kafka -->
<!--<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>-->
<!-- Web支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 监控 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- 测试 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>
基础配置
# application.yml
spring:
application:
name: stream-demo
cloud:
stream:
# 默认绑定器
default-binder: rabbit
# 绑定配置
bindings:
# 输入绑定
input:
destination: user-events
content-type: application/json
group: user-service-group
consumer:
max-attempts: 3
back-off-initial-interval: 1000
back-off-multiplier: 2.0
# 输出绑定
output:
destination: user-events
content-type: application/json
producer:
partition-count: 3
# RabbitMQ绑定器配置
rabbit:
binder:
connection-name-prefix: stream-demo
bindings:
input:
consumer:
auto-bind-dlq: true
republish-to-dlq: true
max-attempts: 3
output:
producer:
routing-key-expression: "'user'"
# Kafka绑定器配置(如果使用Kafka)
kafka:
binder:
brokers: localhost:9092
auto-create-topics: true
auto-add-partitions: true
bindings:
input:
consumer:
start-offset: earliest
enable-dlq: true
output:
producer:
sync: true
# RabbitMQ连接配置
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# 监控配置
management:
endpoints:
web:
exposure:
include: "*"
endpoint:
health:
show-details: always
# 日志配置
logging:
level:
org.springframework.cloud.stream: DEBUG
org.springframework.integration: DEBUG
主启动类
@SpringBootApplication
@EnableBinding({Processor.class, CustomChannels.class})
public class StreamDemoApplication {
public static void main(String[] args) {
SpringApplication.run(StreamDemoApplication.class, args);
}
}
快速入门
基础消息处理
// 使用传统注解方式
@***ponent
@Slf4j
public class MessageHandler {
@StreamListener("input")
public void handleMessage(String message) {
log.info("接收到消息: {}", message);
}
@StreamListener("input")
@SendTo("output")
public String processMessage(String message) {
log.info("处理消息: {}", message);
return "处理结果: " + message.toUpperCase();
}
}
// 使用函数式编程方式(推荐)
@***ponent
public class MessageProcessor {
@Bean
public Function<String, String> uppercase() {
return message -> {
log.info("转换消息为大写: {}", message);
return message.toUpperCase();
};
}
@Bean
public Consumer<String> logger() {
return message -> {
log.info("记录消息: {}", message);
};
}
@Bean
public Supplier<String> timer() {
return () -> {
String timestamp = LocalDateTime.now().toString();
log.info("生成时间戳: {}", timestamp);
return timestamp;
};
}
}
消息发送
@RestController
@Slf4j
public class MessageController {
@Autowired
@Qualifier("output")
private MessageChannel output;
@PostMapping("/send")
public ResponseEntity<String> sendMessage(@RequestBody String message) {
boolean sent = output.send(MessageBuilder.withPayload(message).build());
if (sent) {
log.info("消息发送成功: {}", message);
return ResponseEntity.ok("消息发送成功");
} else {
log.error("消息发送失败: {}", message);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("消息发送失败");
}
}
}
消息绑定
自定义通道
public interface CustomChannels {
String USER_INPUT = "userInput";
String USER_OUTPUT = "userOutput";
String ORDER_INPUT = "orderInput";
String ORDER_OUTPUT = "orderOutput";
@Input(USER_INPUT)
SubscribableChannel userInput();
@Output(USER_OUTPUT)
MessageChannel userOutput();
@Input(ORDER_INPUT)
SubscribableChannel orderInput();
@Output(ORDER_OUTPUT)
MessageChannel orderOutput();
}
动态绑定
@***ponent
@Slf4j
public class DynamicBindingService {
@Autowired
private BindingService bindingService;
@Autowired
private MessageChannelConfigurer messageChannelConfigurer;
public void createDynamicBinding(String channelName, String destination) {
// 创建消息通道
MessageChannel channel = messageChannelConfigurer.configureMessageChannel(channelName);
// 创建绑定配置
BindingProperties bindingProperties = new BindingProperties();
bindingProperties.setDestination(destination);
bindingProperties.setContentType("application/json");
// 创建生产者配置
ProducerProperties producerProperties = new ProducerProperties();
producerProperties.setPartitionCount(3);
bindingProperties.setProducer(producerProperties);
// 绑定通道
Binding<MessageChannel> binding = bindingService.bindProducer(channel, channelName, bindingProperties, null);
log.info("动态创建绑定成功: channel={}, destination={}", channelName, destination);
}
public void destroyDynamicBinding(String channelName) {
bindingService.unbindProducers(channelName);
log.info("销毁动态绑定: {}", channelName);
}
}
条件绑定
@Configuration
public class ConditionalBindingConfig {
@Bean
@ConditionalOnProperty(name = "stream.user.enabled", havingValue = "true")
@EnableBinding(UserChannels.class)
public Object enableUserChannels() {
return new Object();
}
@Bean
@ConditionalOnProperty(name = "stream.order.enabled", havingValue = "true")
@EnableBinding(OrderChannels.class)
public Object enableOrderChannels() {
return new Object();
}
}
消息生产者
基础生产者
@Service
@Slf4j
public class MessageProducer {
@Autowired
@Qualifier("output")
private MessageChannel output;
public void sendMessage(Object payload) {
Message<Object> message = MessageBuilder
.withPayload(payload)
.setHeader("content-type", "application/json")
.setHeader("timestamp", System.currentTimeMillis())
.build();
boolean sent = output.send(message);
log.info("消息发送{}: {}", sent ? "成功" : "失败", payload);
}
public void sendMessageWithHeaders(Object payload, Map<String, Object> headers) {
MessageBuilder<Object> builder = MessageBuilder.withPayload(payload);
headers.forEach(builder::setHeader);
Message<Object> message = builder.build();
output.send(message);
}
}
批量生产者
@Service
@Slf4j
public class BatchMessageProducer {
@Autowired
@Qualifier("output")
private MessageChannel output;
public void sendBatchMessages(List<Object> payloads) {
payloads.parallelStream().forEach(payload -> {
Message<Object> message = MessageBuilder
.withPayload(payload)
.setHeader("batch-id", UUID.randomUUID().toString())
.setHeader("batch-size", payloads.size())
.build();
output.send(message);
});
log.info("批量发送消息完成,数量: {}", payloads.size());
}
}
定时生产者
@***ponent
@Slf4j
public class ScheduledProducer {
@Autowired
private MessageProducer messageProducer;
@Scheduled(fixedRate = 10000) // 每10秒发送一次
public void sendHeartbeat() {
HeartbeatMessage heartbeat = HeartbeatMessage.builder()
.timestamp(Instant.now())
.serviceId("stream-demo")
.status("HEALTHY")
.build();
messageProducer.sendMessage(heartbeat);
log.info("发送心跳消息: {}", heartbeat);
}
@Scheduled(cron = "0 0 * * * *") // 每小时发送一次
public void sendHourlyReport() {
HourlyReport report = generateHourlyReport();
messageProducer.sendMessage(report);
log.info("发送小时报告: {}", report);
}
private HourlyReport generateHourlyReport() {
return HourlyReport.builder()
.timestamp(Instant.now())
.messageCount(getMessageCount())
.errorCount(getErrorCount())
.build();
}
}
消息消费者
基础消费者
@***ponent
@Slf4j
public class MessageConsumer {
@StreamListener("input")
public void handleMessage(String message) {
log.info("处理消息: {}", message);
try {
// 业务逻辑处理
processBusinessLogic(message);
log.info("消息处理成功: {}", message);
} catch (Exception e) {
log.error("消息处理失败: {}", message, e);
throw e; // 重新抛出异常,触发重试机制
}
}
@StreamListener("input")
public void handleUserEvent(@Payload UserEvent event,
@Header Map<String, Object> headers) {
log.info("处理用户事件: {}, 头信息: {}", event, headers);
switch (event.getType()) {
case USER_CREATED:
handleUserCreated(event);
break;
case USER_UPDATED:
handleUserUpdated(event);
break;
case USER_DELETED:
handleUserDeleted(event);
break;
default:
log.warn("未知的用户事件类型: {}", event.getType());
}
}
private void processBusinessLogic(String message) {
// 模拟业务处理
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
条件消费者
@***ponent
@Slf4j
public class ConditionalConsumer {
@StreamListener(value = "input", condition = "headers['event-type']=='user'")
public void handleUserEvent(String message) {
log.info("处理用户事件: {}", message);
}
@StreamListener(value = "input", condition = "headers['event-type']=='order'")
public void handleOrderEvent(String message) {
log.info("处理订单事件: {}", message);
}
@StreamListener(value = "input", condition = "headers['priority']=='high'")
public void handleHighPriorityEvent(String message) {
log.info("处理高优先级事件: {}", message);
}
}
函数式消费者
@***ponent
public class FunctionalConsumer {
@Bean
public Consumer<UserEvent> userEventHandler() {
return userEvent -> {
log.info("函数式处理用户事件: {}", userEvent);
// 根据事件类型处理
switch (userEvent.getType()) {
case USER_CREATED:
sendWel***eEmail(userEvent.getUserId());
break;
case USER_UPDATED:
updateUserCache(userEvent.getUserId());
break;
}
};
}
@Bean
public Function<OrderEvent, OrderProcessResult> orderProcessor() {
return orderEvent -> {
log.info("函数式处理订单事件: {}", orderEvent);
// 处理订单逻辑
OrderProcessResult result = processOrder(orderEvent);
log.info("订单处理结果: {}", result);
return result;
};
}
@Bean
public Function<Flux<SensorData>, Flux<SensorAlert>> sensorDataProcessor() {
return sensorDataFlux -> sensorDataFlux
.window(Duration.ofSeconds(10))
.flatMap(window -> window
.collectList()
.map(this::analyzeData)
.filter(alert -> alert.getLevel() == AlertLevel.HIGH));
}
}
消息路由
基于内容的路由
@***ponent
@Slf4j
public class ContentBasedRouter {
@StreamListener("input")
@SendTo("output")
public Object routeMessage(@Payload Object message,
@Header Map<String, Object> headers) {
String messageType = (String) headers.get("message-type");
switch (messageType) {
case "user-event":
return routeUserEvent(message, headers);
case "order-event":
return routeOrderEvent(message, headers);
case "payment-event":
return routePaymentEvent(message, headers);
default:
log.warn("未知消息类型: {}", messageType);
return message;
}
}
private Object routeUserEvent(Object message, Map<String, Object> headers) {
headers.put("routing-key", "user.events");
return MessageBuilder.withPayload(message).copyHeaders(headers).build();
}
private Object routeOrderEvent(Object message, Map<String, Object> headers) {
headers.put("routing-key", "order.events");
return MessageBuilder.withPayload(message).copyHeaders(headers).build();
}
}
动态路由
@***ponent
@Slf4j
public class DynamicRouter {
@Autowired
private RoutingRuleService routingRuleService;
@ServiceActivator(inputChannel = "input")
public void routeMessage(Message<?> message) {
String payload = (String) message.getPayload();
Map<String, Object> headers = message.getHeaders();
// 获取路由规则
List<RoutingRule> rules = routingRuleService.getRoutingRules();
for (RoutingRule rule : rules) {
if (rule.matches(payload, headers)) {
String destination = rule.getDestination();
// 动态发送到目标通道
MessageChannel channel = getChannelByName(destination);
if (channel != null) {
channel.send(message);
log.info("消息路由到: {}", destination);
} else {
log.warn("目标通道不存在: {}", destination);
}
break;
}
}
}
private MessageChannel getChannelByName(String channelName) {
// 从Spring上下文获取通道
return applicationContext.getBean(channelName, MessageChannel.class);
}
}
@Service
public class RoutingRuleService {
public List<RoutingRule> getRoutingRules() {
// 从数据库或配置中心获取路由规则
return Arrays.asList(
RoutingRule.builder()
.condition("headers['user-type'] == 'vip'")
.destination("vip-channel")
.build(),
RoutingRule.builder()
.condition("payload contains 'urgent'")
.destination("urgent-channel")
.build()
);
}
}
消息分区
分区配置
spring:
cloud:
stream:
bindings:
output:
destination: partitioned-topic
producer:
partition-count: 4
partition-key-expression: "headers['user-id']"
input:
destination: partitioned-topic
group: partition-consumer-group
consumer:
partitioned: true
instance-count: 4
instance-index: 0
自定义分区策略
@***ponent
public class CustomPartitionKeyExtractor implements PartitionKeyExtractorStrategy {
@Override
public Object extractKey(Message<?> message) {
// 基于用户ID分区
Object userId = message.getHeaders().get("user-id");
if (userId != null) {
return userId;
}
// 基于消息内容分区
Object payload = message.getPayload();
if (payload instanceof UserEvent) {
UserEvent event = (UserEvent) payload;
return event.getUserId();
}
// 默认分区
return "default";
}
}
@***ponent
public class CustomPartitionSelector implements PartitionSelectorStrategy {
@Override
public int selectPartition(Object key, int partitionCount) {
if (key instanceof String) {
return Math.abs(key.hashCode()) % partitionCount;
} else if (key instanceof Long) {
return (int) ((Long) key % partitionCount);
}
return 0; // 默认分区
}
}
分区消费者
@***ponent
@Slf4j
public class PartitionedConsumer {
@StreamListener("input")
public void handlePartitionedMessage(String message,
@Header("scst_partition") int partition) {
log.info("处理分区{}的消息: {}", partition, message);
// 根据分区执行不同的处理逻辑
switch (partition) {
case 0:
handleHighPriorityMessages(message);
break;
case 1:
handleMediumPriorityMessages(message);
break;
default:
handleLowPriorityMessages(message);
}
}
private void handleHighPriorityMessages(String message) {
log.info("处理高优先级消息: {}", message);
}
private void handleMediumPriorityMessages(String message) {
log.info("处理中优先级消息: {}", message);
}
private void handleLowPriorityMessages(String message) {
log.info("处理低优先级消息: {}", message);
}
}
错误处理
全局错误处理
@***ponent
@Slf4j
public class GlobalErrorHandler {
@ServiceActivator(inputChannel = "errorChannel")
public void handleError(ErrorMessage errorMessage) {
Throwable throwable = errorMessage.getPayload();
Message<?> failedMessage = (Message<?>) errorMessage.getHeaders().get("originalMessage");
log.error("消息处理失败", throwable);
if (failedMessage != null) {
log.error("失败的消息: {}", failedMessage.getPayload());
// 根据异常类型进行不同处理
if (throwable instanceof BusinessException) {
handleBusinessException((BusinessException) throwable, failedMessage);
} else if (throwable instanceof SystemException) {
handleSystemException((SystemException) throwable, failedMessage);
} else {
handleGenericException(throwable, failedMessage);
}
}
}
private void handleBusinessException(BusinessException e, Message<?> message) {
log.warn("业务异常: {}", e.getMessage());
// 发送到业务异常处理队列
sendToDLQ(message, "business-error");
}
private void handleSystemException(SystemException e, Message<?> message) {
log.error("系统异常: {}", e.getMessage());
// 发送到系统异常处理队列
sendToDLQ(message, "system-error");
}
private void handleGenericException(Throwable e, Message<?> message) {
log.error("通用异常: {}", e.getMessage());
// 发送到通用错误处理队列
sendToDLQ(message, "generic-error");
}
private void sendToDLQ(Message<?> message, String errorType) {
// 构建错误消息
Message<Object> errorMsg = MessageBuilder
.withPayload(message.getPayload())
.setHeader("error-type", errorType)
.setHeader("error-timestamp", Instant.now().toString())
.setHeader("original-headers", message.getHeaders())
.build();
// 发送到死信队列
dlqChannel.send(errorMsg);
}
}
重试机制
spring:
cloud:
stream:
bindings:
input:
consumer:
max-attempts: 3
back-off-initial-interval: 1000
back-off-multiplier: 2.0
back-off-max-interval: 10000
retry-template-name: myRetryTemplate
rabbit:
bindings:
input:
consumer:
auto-bind-dlq: true
republish-to-dlq: true
dlq-ttl: 86400000 # 24小时
dlq-max-length: 1000
@Configuration
public class RetryConfig {
@Bean
public RetryTemplate myRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
// 重试策略
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
retryTemplate.setRetryPolicy(retryPolicy);
// 退避策略
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(2.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
// 重试监听器
retryTemplate.registerListener(new RetryListenerSupport() {
@Override
public <T, E extends Throwable> void onError(RetryContext context,
RetryCallback<T, E> callback,
Throwable throwable) {
log.warn("重试执行失败,第{}次尝试: {}",
context.getRetryCount(), throwable.getMessage());
}
});
return retryTemplate;
}
}
自定义错误处理
@***ponent
@Slf4j
public class CustomErrorHandler implements ErrorHandler {
@Autowired
private ErrorNotificationService errorNotificationService;
@Override
public void handleError(Throwable t) {
log.error("处理错误", t);
// 创建错误报告
ErrorReport errorReport = ErrorReport.builder()
.timestamp(Instant.now())
.errorType(t.getClass().getSimpleName())
.errorMessage(t.getMessage())
.stackTrace(getStackTrace(t))
.build();
// 保存错误报告
saveErrorReport(errorReport);
// 发送告警通知
if (isCriticalError(t)) {
errorNotificationService.sendCriticalAlert(errorReport);
}
}
private boolean isCriticalError(Throwable t) {
return t instanceof OutOfMemoryError ||
t instanceof StackOverflowError ||
t instanceof NoClassDefFoundError;
}
private String getStackTrace(Throwable t) {
StringWriter sw = new StringWriter();
t.printStackTrace(new PrintWriter(sw));
return sw.toString();
}
private void saveErrorReport(ErrorReport errorReport) {
// 保存到数据库或文件
log.debug("保存错误报告: {}", errorReport);
}
}
消息确认
手动确认
spring:
cloud:
stream:
rabbit:
bindings:
input:
consumer:
acknowledge-mode: manual
auto-bind-dlq: true
@***ponent
@Slf4j
public class ManualAckConsumer {
@RabbitListener(queues = "user-events")
public void handleMessage(@Payload String message,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
Channel channel) throws IOException {
try {
log.info("处理消息: {}", message);
// 业务逻辑处理
processMessage(message);
// 手动确认
channel.basicAck(deliveryTag, false);
log.info("消息确认成功: {}", message);
} catch (BusinessException e) {
log.warn("业务异常,拒绝消息: {}", e.getMessage());
// 拒绝消息,不重新入队
channel.basi***ack(deliveryTag, false, false);
} catch (Exception e) {
log.error("处理异常,拒绝消息并重新入队: {}", e.getMessage());
// 拒绝消息,重新入队
channel.basi***ack(deliveryTag, false, true);
}
}
private void processMessage(String message) {
// 模拟业务处理
if (message.contains("error")) {
throw new BusinessException("模拟业务异常");
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
事务支持
@Configuration
@EnableTransactionManagement
public class TransactionConfig {
@Bean
public PlatformTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Bean
public RabbitTransactionManager rabbitTransactionManager(
ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
}
@***ponent
@Slf4j
public class TransactionalConsumer {
@StreamListener("input")
@Transactional(rollbackFor = Exception.class)
public void handleTransactionalMessage(String message) {
log.info("事务性处理消息: {}", message);
try {
// 数据库操作
saveToDatabase(message);
// 发送下游消息
sendDownstreamMessage(message);
log.info("事务性处理完成: {}", message);
} catch (Exception e) {
log.error("事务性处理失败,回滚: {}", message, e);
throw e; // 触发事务回滚
}
}
private void saveToDatabase(String message) {
// 数据库保存逻辑
messageRepository.save(new MessageRecord(message, Instant.now()));
}
private void sendDownstreamMessage(String message) {
// 发送下游消息
downstreamChannel.send(MessageBuilder.withPayload(message).build());
}
}
监控管理
Actuator端点
management:
endpoints:
web:
exposure:
include: "*"
endpoint:
bindings:
enabled: true
channels:
enabled: true
health:
show-details: always
自定义健康检查
@***ponent
public class StreamHealthIndicator implements HealthIndicator {
@Autowired
private BindingService bindingService;
@Override
public Health health() {
try {
// 检查绑定状态
Set<String> bindingNames = getBindingNames();
Map<String, Object> details = new HashMap<>();
boolean allHealthy = true;
for (String bindingName : bindingNames) {
boolean isHealthy = checkBindingHealth(bindingName);
details.put(bindingName, isHealthy ? "UP" : "DOWN");
if (!isHealthy) {
allHealthy = false;
}
}
return allHealthy ? Health.up().withDetails(details).build()
: Health.down().withDetails(details).build();
} catch (Exception e) {
return Health.down()
.withDetail("error", e.getMessage())
.build();
}
}
private Set<String> getBindingNames() {
// 获取所有绑定名称
return Arrays.stream(new String[]{"input", "output"})
.collect(Collectors.toSet());
}
private boolean checkBindingHealth(String bindingName) {
// 检查特定绑定的健康状态
try {
// 这里可以检查连接状态、队列状态等
return true;
} catch (Exception e) {
return false;
}
}
}
Metrics指标
@***ponent
@Slf4j
public class StreamMetrics {
private final MeterRegistry meterRegistry;
private final Counter messageProcessedCounter;
private final Counter messageErrorCounter;
private final Timer messageProcessingTimer;
public StreamMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.messageProcessedCounter = Counter.builder("stream.messages.processed")
.description("Total processed messages")
.register(meterRegistry);
this.messageErrorCounter = Counter.builder("stream.messages.error")
.description("Total error messages")
.register(meterRegistry);
this.messageProcessingTimer = Timer.builder("stream.message.processing.duration")
.description("Message processing duration")
.register(meterRegistry);
}
@EventListener
public void handleMessageProcessed(MessageProcessedEvent event) {
messageProcessedCounter.increment(
Tags.of(
"binding", event.getBinding(),
"type", event.getMessageType()
)
);
}
@EventListener
public void handleMessageError(MessageErrorEvent event) {
messageErrorCounter.increment(
Tags.of(
"binding", event.getBinding(),
"error", event.getErrorType()
)
);
}
public void recordProcessingTime(String binding, Duration duration) {
messageProcessingTimer.record(duration, TimeUnit.MILLISECONDS);
}
}
实战案例
电商订单处理系统
// 订单事件定义
@Data
@Builder
public class OrderEvent {
private String orderId;
private String userId;
private OrderStatus status;
private BigDecimal amount;
private List<OrderItem> items;
private Instant timestamp;
}
@Data
@Builder
public class OrderItem {
private String productId;
private String productName;
private Integer quantity;
private BigDecimal price;
}
// 订单服务
@Service
@Slf4j
public class OrderService {
@Autowired
@Qualifier("orderOutput")
private MessageChannel orderOutput;
public Order createOrder(CreateOrderRequest request) {
// 1. 创建订单
Order order = new Order();
order.setId(UUID.randomUUID().toString());
order.setUserId(request.getUserId());
order.setItems(request.getItems());
order.setAmount(calculateAmount(request.getItems()));
order.setStatus(OrderStatus.CREATED);
order.setCreatedAt(Instant.now());
// 2. 保存订单
orderRepository.save(order);
// 3. 发布订单创建事件
OrderEvent event = OrderEvent.builder()
.orderId(order.getId())
.userId(order.getUserId())
.status(OrderStatus.CREATED)
.amount(order.getAmount())
.items(convertToOrderItems(request.getItems()))
.timestamp(Instant.now())
.build();
publishOrderEvent(event);
return order;
}
private void publishOrderEvent(OrderEvent event) {
Message<OrderEvent> message = MessageBuilder
.withPayload(event)
.setHeader("event-type", "order")
.setHeader("order-id", event.getOrderId())
.setHeader("user-id", event.getUserId())
.build();
boolean sent = orderOutput.send(message);
if (sent) {
log.info("订单事件发布成功: {}", event);
} else {
log.error("订单事件发布失败: {}", event);
throw new RuntimeException("订单事件发布失败");
}
}
}
// 库存服务监听器
@***ponent
@Slf4j
public class InventoryEventHandler {
@Autowired
private InventoryService inventoryService;
@StreamListener(value = "orderInput",
condition = "headers['event-type']=='order' && payload.status.name()=='CREATED'")
public void handleOrderCreated(OrderEvent orderEvent) {
log.info("处理订单创建事件,检查库存: {}", orderEvent.getOrderId());
try {
// 检查库存
boolean stockAvailable = inventoryService.checkStock(orderEvent.getItems());
if (stockAvailable) {
// 预留库存
inventoryService.reserveStock(orderEvent.getOrderId(), orderEvent.getItems());
// 发布库存预留成功事件
publishInventoryReservedEvent(orderEvent);
} else {
// 发布库存不足事件
publishInventoryInsufficientEvent(orderEvent);
}
} catch (Exception e) {
log.error("库存检查失败: {}", orderEvent.getOrderId(), e);
publishInventoryErrorEvent(orderEvent, e.getMessage());
}
}
private void publishInventoryReservedEvent(OrderEvent orderEvent) {
InventoryEvent event = InventoryEvent.builder()
.orderId(orderEvent.getOrderId())
.status(InventoryStatus.RESERVED)
.timestamp(Instant.now())
.build();
inventoryOutput.send(MessageBuilder.withPayload(event).build());
log.info("库存预留成功事件已发布: {}", event);
}
}
// 支付服务监听器
@***ponent
@Slf4j
public class PaymentEventHandler {
@Autowired
private PaymentService paymentService;
@StreamListener(value = "inventoryInput",
condition = "payload.status.name()=='RESERVED'")
public void handleInventoryReserved(InventoryEvent inventoryEvent) {
log.info("处理库存预留事件,开始支付: {}", inventoryEvent.getOrderId());
try {
// 获取订单信息
Order order = orderService.getOrder(inventoryEvent.getOrderId());
// 处理支付
PaymentResult result = paymentService.processPayment(
order.getUserId(), order.getAmount(), order.getId());
if (result.isSu***ess()) {
publishPaymentSu***essEvent(order, result);
} else {
publishPaymentFailedEvent(order, result);
}
} catch (Exception e) {
log.error("支付处理失败: {}", inventoryEvent.getOrderId(), e);
publishPaymentErrorEvent(inventoryEvent.getOrderId(), e.getMessage());
}
}
}
// 通知服务监听器
@***ponent
@Slf4j
public class NotificationEventHandler {
@Autowired
private NotificationService notificationService;
@StreamListener("paymentInput")
public void handlePaymentEvent(PaymentEvent paymentEvent) {
log.info("处理支付事件: {}", paymentEvent);
switch (paymentEvent.getStatus()) {
case SU***ESS:
sendOrderSu***essNotification(paymentEvent);
break;
case FAILED:
sendOrderFailedNotification(paymentEvent);
break;
case ERROR:
sendPaymentErrorNotification(paymentEvent);
break;
}
}
private void sendOrderSu***essNotification(PaymentEvent event) {
try {
// 获取用户信息
User user = userService.getUser(event.getUserId());
// 发送邮件通知
notificationService.sendEmail(
user.getEmail(),
"订单支付成功",
String.format("您的订单 %s 已支付成功,金额:%s",
event.getOrderId(), event.getAmount())
);
// 发送短信通知
notificationService.sendSms(
user.getPhone(),
String.format("订单%s支付成功,金额%s元",
event.getOrderId(), event.getAmount())
);
log.info("订单成功通知已发送: {}", event.getOrderId());
} catch (Exception e) {
log.error("发送订单成功通知失败: {}", event.getOrderId(), e);
}
}
}
最佳实践
1. 消息设计原则
// 好的消息设计
@Data
@Builder
public class UserEvent {
private String eventId; // 事件唯一标识
private String eventType; // 事件类型
private String userId; // 业务标识
private Instant timestamp; // 时间戳
private String version; // 版本号
private Map<String, Object> data; // 业务数据
private Map<String, String> metadata; // 元数据
}
// 消息版本兼容性
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "version")
@JsonSubTypes({
@JsonSubTypes.Type(value = UserEventV1.class, name = "v1"),
@JsonSubTypes.Type(value = UserEventV2.class, name = "v2")
})
public abstract class BaseUserEvent {
protected String version;
}
2. 幂等性处理
@***ponent
@Slf4j
public class IdempotentMessageHandler {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@StreamListener("input")
public void handleMessage(@Payload UserEvent event,
@Header Map<String, Object> headers) {
String messageId = event.getEventId();
String idempotencyKey = "msg_processed:" + messageId;
// 检查消息是否已处理
Boolean exists = redisTemplate.hasKey(idempotencyKey);
if (Boolean.TRUE.equals(exists)) {
log.info("消息已处理,跳过: {}", messageId);
return;
}
try {
// 处理业务逻辑
processBusinessLogic(event);
// 标记消息已处理
redisTemplate.opsForValue().set(idempotencyKey, "processed",
Duration.ofHours(24));
log.info("消息处理完成: {}", messageId);
} catch (Exception e) {
log.error("消息处理失败: {}", messageId, e);
throw e;
}
}
}
3. 消息序列化优化
@Configuration
public class StreamSerializationConfig {
@Bean
public MessageConverter messageConverter() {
***positeMessageConverter converter = new ***positeMessageConverter(
Arrays.asList(
new JsonMessageConverter(),
new AvroMessageConverter(),
new ProtobufMessageConverter()
)
);
return converter;
}
@Bean
public ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
mapper.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
return mapper;
}
}
4. 性能监控
@***ponent
@Slf4j
public class StreamPerformanceMonitor {
@Autowired
private MeterRegistry meterRegistry;
private final Timer messageProcessingTimer;
private final Counter messageCounter;
public StreamPerformanceMonitor(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.messageProcessingTimer = Timer.builder("stream.message.processing.time")
.description("Message processing time")
.register(meterRegistry);
this.messageCounter = Counter.builder("stream.message.count")
.description("Message count")
.register(meterRegistry);
}
@EventListener
public void onMessageReceived(MessageReceivedEvent event) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
// 处理消息
processMessage(event.getMessage());
messageCounter.increment(Tags.of(
"binding", event.getBinding(),
"status", "su***ess"
));
} catch (Exception e) {
messageCounter.increment(Tags.of(
"binding", event.getBinding(),
"status", "error"
));
throw e;
} finally {
sample.stop(messageProcessingTimer);
}
}
}
5. 生产环境配置
# 生产环境配置
spring:
cloud:
stream:
default-binder: rabbit
bindings:
input:
destination: prod-user-events
group: user-service-group
content-type: application/json
consumer:
max-attempts: 3
back-off-initial-interval: 2000
back-off-multiplier: 2.0
back-off-max-interval: 30000
use-native-decoding: true
output:
destination: prod-user-events
content-type: application/json
producer:
partition-count: 8
required-groups: user-service-group
use-native-encoding: true
rabbit:
binder:
connection-name-prefix: stream-prod
admin-addresses: rabbit-cluster:15672
bindings:
input:
consumer:
auto-bind-dlq: true
republish-to-dlq: true
dlq-ttl: 86400000
max-length: 10000
prefetch: 100
output:
producer:
delivery-mode: PERSISTENT
mandatory: true
confirm-ack-channel: confirmAckChannel
rabbitmq:
addresses: rabbit-node1:5672,rabbit-node2:5672,rabbit-node3:5672
username: ${RABBIT_USERNAME}
password: ${RABBIT_PASSWORD}
virtual-host: /prod
connection-timeout: 30000
publisher-confirm-type: correlated
publisher-returns: true
listener:
simple:
acknowledge-mode: manual
prefetch: 50
retry:
enabled: true
max-attempts: 3
initial-interval: 2000
multiplier: 2.0
总结
Spring Cloud Stream简化了基于消息的微服务构建,通过本文学习,您应该掌握:
核心概念:理解绑定器、绑定、通道等核心概念
消息处理:掌握生产者和消费者的开发模式
高级特性:消息路由、分区、错误处理等
监控运维:健康检查、指标监控、故障排查
实战应用:在真实业务场景中应用Stream
进阶学习建议
深入研究绑定器实现:了解不同消息中间件的特性
性能调优:针对高并发场景进行性能优化
云原生集成:与Kuber***es、Service Mesh集成
事件溯源模式:实现事件驱动的数据架构
CQRS模式:结合***mand Query Responsibility Segregation
Spring Cloud Stream是构建现代微服务架构的重要工具,掌握它对于实现松耦合、可扩展的分布式系统至关重要