RabbitMQ 实战教程:从零构建微服务电商订单系统
📌 摘要:本教程通过构建一个完整的电商订单微服务系统,带你从零掌握 RabbitMQ 在 Spring Boot 项目中的实战应用。 项目采用 Spring Boot 2.7 + Spring AMQP + MyBatis-Plus + MySQL 8.0 + Redis 构建后端微服务架构(订单、库存、通知、网关四大服务),前端使用 Vue3 + Element Plus 提供可视化操作界面,基础设施通过 Docker ***pose 一键部署(RabbitMQ 3.12 + MySQL + Redis)。教程涵盖 RabbitMQ 核心概念、Direct/Topic/Fanout 三种交换机实战、生产者确认与退回机制、消息持久化、消费者手动 ACK、死信队列配置、消息幂等性处理(基于 Redis 去重)、消息日志追踪等生产级特性。
🎯 本教程的独特优势
| 其他教程 | 本教程 |
|---|---|
| ❌ 只讲理论概念 | ✅ 理论 + 完整项目实战 |
| ❌ 简单的 Hello World | ✅ 真实的微服务电商系统 |
| ❌ 代码片段零散 | ✅ 完整可运行的代码仓库 |
| ❌ 缺少环境搭建 | ✅ Docker 一键启动全部基础设施 |
| ❌ 没有前端界面 | ✅ Vue3 可视化操作界面 |
| ❌ 不讲消息可靠性 | ✅ 生产者确认、消费者确认、死信队列全覆盖 |
| ❌ 缺少实战场景 | ✅ 订单、库存、通知真实业务场景 |
| ❌ 看完还是不会用 | ✅ 跟着教程走一遍,立即上手实战 |
💡 你将学到什么
基础篇
- ✅ RabbitMQ 核心概念(Exchange、Queue、Binding)
- ✅ 四种交换机类型的实际应用
- ✅ 消息生产者和消费者的实现
进阶篇
- ✅ 消息可靠性保证(生产者确认、消息持久化、消费者确认)
- ✅ 死信队列的配置和使用
- ✅ 消息幂等性处理(防止重复消费)
实战篇
- ✅ 完整的微服务项目(订单、库存、通知、网关)
- ✅ Spring Boot 集成 RabbitMQ 最佳实践
- ✅ Docker ***pose 一键部署
- ✅ 前后端完整代码
🎁 附赠内容
- 📦 完整源码:开箱即用的 Maven + Spring Boot 项目
- 🐳 Docker 环境:RabbitMQ + MySQL + Redis 一键启动
- 🎨 前端界面:Vue3 + Element Plus 管理界面
- 📚 详细文档:启动指南、Docker 配置详解、学习笔记
- 💬 代码注释:每个关键点都有详细注释
🚀 学完后你能做什么
- ✅ 在自己的项目中应用 RabbitMQ
- ✅ 设计高可用的消息队列架构
- ✅ 解决消息丢失、重复消费等问题
- ✅ 面试时自信地讲解 RabbitMQ 原理和实践
- ✅ 快速上手 Kafka、RocketMQ 等其他消息队列
📖 目录
- 项目介绍
- 快速开始
- RabbitMQ 核心概念
- 项目架构详解
- 消息生产者实现
- 消息消费者实现
- 消息可靠性保证
- 死信队列实战
- 消息幂等性处理
- 性能优化技巧
- 监控和调试
- 常见问题
- 总结与展望
1. 项目介绍
1.1 业务场景
我们构建了一个电商订单系统,包含以下核心流程:
用户创建订单
↓
订单服务保存订单 → 发送消息到 RabbitMQ
↓ ↓
返回成功 ┌──────┴──────┐
↓ ↓
库存服务扣减库存 通知服务发送通知
(异步) (异步)
为什么用 RabbitMQ?
传统同步调用的问题:
// ❌ 传统方式:串行调用,响应慢
orderService.createOrder(order);
inventoryService.reduceStock(order); // 等待库存服务
notificationService.sendNotify(order); // 等待通知服务
// 总耗时 = 订单服务 + 库存服务 + 通知服务
使用 RabbitMQ 后:
// ✅ 异步方式:发完消息就返回
orderService.createOrder(order);
rabbitMQProducer.sendMessage("inventory", orderMessage);
rabbitMQProducer.sendMessage("notification", orderMessage);
// 总耗时 = 订单服务 + 发送消息(几毫秒)
1.2 技术栈
后端:
- Spring Boot 2.7.18
- Spring AMQP(RabbitMQ)
- MyBatis-Plus
- MySQL 8.0
- Redis
前端:
- Vue 3
- Element Plus
- Axios
基础设施:
- RabbitMQ 3.12(Docker)
- MySQL 8.0(Docker)
- Redis 7(Docker)
1.3 项目结构
rabbitmq-microservice-demo/
├── order-service/ # 订单服务(生产者)
│ └── rabbitmq/
│ ├── RabbitMQConfig.java # RabbitMQ 配置
│ └── RabbitMQProducer.java # 消息发送
├── inventory-service/ # 库存服务(消费者)
│ └── rabbitmq/
│ └── InventoryConsumer.java # 消息接收
├── notification-service/ # 通知服务(消费者)
│ └── rabbitmq/
│ └── NotificationConsumer.java
├── gateway-service/ # API 网关
├── frontend/ # Vue3 前端
├── docker-***pose.yml # 一键启动脚本
└── db/init.sql # 数据库初始化
2. 快速开始
2.1 前置准备
- JDK 1.8+
- Maven 3.6+
- Docker Desktop
- Node.js 16+
2.2 启动基础设施
# 克隆项目
git clone [你的项目地址]
cd rabbitmq-microservice-demo
# 启动 RabbitMQ + MySQL + Redis
docker-***pose up -d
# 等待 30 秒让服务完全启动
2.3 访问 RabbitMQ 管理界面
浏览器打开:http://localhost:15672
- 用户名:
admin - 密码:
admin123
你会看到:
- 📊 Dashboard:系统概览
- 🔄 Exchanges:交换机列表
- 📮 Queues:队列列表
- 🔗 Connections:连接状态
2.4 启动后端服务
# 方式1:使用 IDEA
# 按顺序运行:
# 1. OrderServiceApplication (8081)
# 2. InventoryServiceApplication (8082)
# 3. NotificationServiceApplication (8083)
# 4. GatewayServiceApplication (8080)
# 方式2:使用命令行
cd order-service && mvn spring-boot:run
2.5 启动前端
cd frontend
npm install
npm run dev
访问:http://localhost:5173
2.6 测试完整流程
- 在前端创建一个订单
- 观察后端控制台的日志输出
- 在 RabbitMQ 管理界面看消息流转
- 查看数据库中的数据变化
恭喜!🎉 环境搭建完成,开始学习吧!
3. RabbitMQ 核心概念
3.1 基本组件
Producer → Exchange → Queue → Consumer
生产者 交换机 队列 消费者
Producer(生产者)
是什么?
- 发送消息的应用程序
在我们项目中:
- 订单服务就是生产者
- 位置:
order-service/src/main/java/***/example/order/rabbitmq/RabbitMQProducer.java
代码示例:
// 订单服务发送消息
public void createOrder(OrderCreateDTO dto) {
// 1. 保存订单
orderRepository.save(order);
// 2. 发送库存扣减消息
rabbitMQProducer.sendMessage(
"inventory.exchange",
"inventory.reduce",
new InventoryReduceDTO(orderNo, productId, quantity)
);
// 3. 发送订单通知消息
rabbitMQProducer.sendMessage(
"notification.exchange",
"notification.order",
new OrderNotificationDTO(orderNo, userId, ...)
);
}
Exchange(交换机)
是什么?
- 接收生产者的消息
- 根据路由规则将消息分发到队列
在我们项目中定义了 3 个交换机:
// 位置:order-service/.../RabbitMQConfig.java
// 1. 库存交换机(Direct 类型)
@Bean
public DirectExchange inventoryExchange() {
return new DirectExchange("inventory.exchange", true, false);
}
// 2. 通知交换机(Topic 类型)
@Bean
public TopicExchange notificationExchange() {
return new TopicExchange("notification.exchange", true, false);
}
// 3. 死信交换机(处理失败消息)
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("dead-letter.exchange", true, false);
}
Queue(队列)
是什么?
- 存储消息的容器
- 消费者从队列中获取消息
在我们项目中定义了 3 个队列:
// 1. 库存队列
@Bean
public Queue inventoryQueue() {
return QueueBuilder.durable("inventory.queue")
.deadLetterExchange("dead-letter.exchange") // 配置死信交换机
.deadLetterRoutingKey("dead-letter")
.build();
}
// 2. 通知队列
@Bean
public Queue notificationQueue() {
return QueueBuilder.durable("notification.queue")
.deadLetterExchange("dead-letter.exchange")
.deadLetterRoutingKey("dead-letter")
.build();
}
// 3. 死信队列
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("dead-letter.queue").build();
}
Consumer(消费者)
是什么?
- 接收并处理消息的应用程序
在我们项目中:
- 库存服务消费库存队列
- 通知服务消费通知队列
代码示例:
// 位置:inventory-service/.../InventoryConsumer.java
@RabbitListener(queues = "inventory.queue")
public void handleInventoryReduce(Message message, Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 1. 解析消息
InventoryReduceDTO dto = JSON.parseObject(
new String(message.getBody()),
InventoryReduceDTO.class
);
// 2. 执行业务逻辑(扣减库存)
inventoryService.reduceInventory(
dto.getOrderNo(),
dto.getProductId(),
dto.getQuantity()
);
// 3. 手动确认消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 4. 处理失败,拒绝消息
channel.basi***ack(deliveryTag, false, false);
}
}
3.2 交换机类型详解
Direct Exchange(直连交换机)
路由规则: routing key 必须完全匹配
使用场景: 需要精确路由的场景
在我们项目中:
// 库存交换机使用 Direct 类型
@Bean
public DirectExchange inventoryExchange() {
return new DirectExchange("inventory.exchange", true, false);
}
// 绑定:routing-key = "inventory.reduce"
@Bean
public Binding inventoryBinding() {
return BindingBuilder
.bind(inventoryQueue())
.to(inventoryExchange())
.with("inventory.reduce"); // 必须精确匹配
}
消息流转:
发送消息:routing-key = "inventory.reduce"
↓
inventory.exchange (Direct)
↓
匹配 routing-key = "inventory.reduce"
↓
路由到 inventory.queue
Topic Exchange(主题交换机)
路由规则: 支持通配符
-
*匹配一个单词 -
#匹配零个或多个单词
使用场景: 需要灵活路由的场景
在我们项目中:
// 通知交换机使用 Topic 类型
@Bean
public TopicExchange notificationExchange() {
return new TopicExchange("notification.exchange", true, false);
}
// 绑定
@Bean
public Binding notificationBinding() {
return BindingBuilder
.bind(notificationQueue())
.to(notificationExchange())
.with("notification.order"); // 可以用通配符:notification.#
}
通配符示例:
notification.* 匹配 notification.order、notification.sms
notification.# 匹配 notification.order.create、notification.sms.send
notification.order.* 匹配 notification.order.create、notification.order.cancel
Fanout Exchange(扇形交换机)
路由规则: 广播到所有绑定的队列,忽略 routing key
使用场景: 广播通知、日志收集
代码示例:
// 创建 Fanout 交换机
@Bean
public FanoutExchange broadcastExchange() {
return new FanoutExchange("broadcast.exchange", true, false);
}
// 绑定多个队列
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsQueue()).to(broadcastExchange());
}
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(broadcastExchange());
}
// 发送消息(routing key 无效)
rabbitTemplate.convertAndSend("broadcast.exchange", "", message);
// 所有绑定的队列都会收到消息
3.3 Binding(绑定)
是什么?
- Exchange 和 Queue 之间的关系
- 定义路由规则
在我们项目中:
@Bean
public Binding inventoryBinding() {
return BindingBuilder
.bind(inventoryQueue()) // 队列
.to(inventoryExchange()) // 交换机
.with("inventory.reduce"); // routing key
}
绑定关系可视化:
inventory.exchange (Direct)
↓ (routing-key: inventory.reduce)
inventory.queue
↓
InventoryConsumer
notification.exchange (Topic)
↓ (routing-key: notification.order)
notification.queue
↓
NotificationConsumer
4. 项目架构详解
4.1 整体架构图
┌─────────────────────────────────────────────────────┐
│ 前端界面 (Vue3) │
│ http://localhost:5173 │
└─────────────────┬───────────────────────────────────┘
│ HTTP
┌─────────▼──────────┐
│ API 网关 (8080) │
└─────────┬──────────┘
│
┌────────────┼────────────┐
│ │ │
┌────▼────┐ ┌───▼────┐ ┌───▼─────┐
│ 订单服务 │ │库存服务 │ │通知服务 │
│ (8081) │ │ (8082) │ │ (8083) │
│ 生产者 │ │ 消费者 │ │ 消费者 │
└────┬────┘ └───▲────┘ └───▲─────┘
│ │ │
│ ┌────────┴────────────┴────────┐
└──► RabbitMQ (Docker:5672) │
│ - inventory.queue │
│ - notification.queue │
│ - dead-letter.queue │
└───────────────────────────────┘
│ │
┌────────▼────┐ ┌────▼─────┐
│MySQL (3306) │ │Redis(6379)│
│ (Docker) │ │ (Docker) │
└─────────────┘ └───────────┘
4.2 消息流转流程
完整的订单创建流程:
① 用户在前端提交订单
↓
② 前端 POST → API网关 → 订单服务
↓
③ 订单服务保存订单到 MySQL
↓
④ 订单服务发送消息到 RabbitMQ
├─→ inventory.exchange → inventory.queue
└─→ notification.exchange → notification.queue
↓
⑤ 返回成功给前端(异步处理)
↓
⑥ 库存服务监听 inventory.queue
├─ 接收消息
├─ 扣减库存(乐观锁)
├─ 记录库存流水
└─ ACK 确认
↓
⑦ 通知服务监听 notification.queue
├─ 接收消息
├─ 发送通知(模拟邮件/短信)
├─ 记录通知日志
└─ ACK 确认
↓
⑧ 用户可以在前端查看订单、库存、通知状态
时序图:
用户 订单服务 RabbitMQ 库存服务 通知服务
│ │ │ │ │
├─创建订单─►│ │ │ │
│ ├─保存订单─►DB │ │
│ ├─发消息───►│ │ │
│◄─返回成功─┤ │ │ │
│ │ ├─推送────►│ │
│ │ │ ├─扣库存───►DB
│ │ │ ├─ACK─────►│
│ │ ├─推送─────────────────►│
│ │ │ │ ├─发通知─►
│ │ │ │ ├─ACK───►│
│ │ │ │ │
4.3 项目目录结构
订单服务(生产者):
order-service/
└── src/main/java/***/example/order/
├── controller/
│ └── OrderController.java # REST API
├── service/
│ └── OrderService.java # 业务逻辑
├── rabbitmq/
│ ├── RabbitMQConfig.java # 核心配置 ⭐
│ └── RabbitMQProducer.java # 消息发送 ⭐
├── entity/
│ └── Order.java # 订单实体
└── dto/
├── InventoryReduceDTO.java # 库存消息DTO
└── OrderNotificationDTO.java # 通知消息DTO
库存服务(消费者):
inventory-service/
└── src/main/java/***/example/inventory/
├── service/
│ └── InventoryService.java # 库存业务
└── rabbitmq/
└── InventoryConsumer.java # 消息接收 ⭐
5. 消息生产者实现
5.1 配置 RabbitMQ
位置: order-service/src/main/java/***/example/order/rabbitmq/RabbitMQConfig.java
@Configuration
public class RabbitMQConfig {
// 从配置文件读取队列名称
@Value("${rabbitmq.inventory.exchange}")
private String inventoryExchange;
@Value("${rabbitmq.inventory.queue}")
private String inventoryQueue;
@Value("${rabbitmq.inventory.routing-key}")
private String inventoryRoutingKey;
// ========== 创建交换机 ==========
@Bean
public DirectExchange inventoryExchange() {
// 参数1:交换机名称
// 参数2:是否持久化(重启后是否存在)
// 参数3:是否自动删除
return new DirectExchange(inventoryExchange, true, false);
}
// ========== 创建队列 ==========
@Bean
public Queue inventoryQueue() {
return QueueBuilder.durable(inventoryQueue)
// 配置死信交换机(消息处理失败后的去处)
.deadLetterExchange("dead-letter.exchange")
.deadLetterRoutingKey("dead-letter")
.build();
}
// ========== 绑定交换机和队列 ==========
@Bean
public Binding inventoryBinding() {
return BindingBuilder
.bind(inventoryQueue()) // 队列
.to(inventoryExchange()) // 交换机
.with(inventoryRoutingKey); // routing key
}
// ========== 配置 RabbitTemplate ==========
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// 设置消息转换器为 JSON
template.setMessageConverter(jackson2JsonMessageConverter());
// 消息路由失败时是否退回
template.setMandatory(true);
return template;
}
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
// ========== 配置 RabbitAdmin ==========
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
}
配置文件: order-service/src/main/resources/application.yml
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin123
virtual-host: /
# 生产者确认
publisher-confirm-type: correlated
# 生产者退回
publisher-returns: true
template:
mandatory: true
# 自定义配置
rabbitmq:
inventory:
exchange: inventory.exchange
queue: inventory.queue
routing-key: inventory.reduce
notification:
exchange: notification.exchange
queue: notification.queue
routing-key: notification.order
5.2 实现消息发送
位置: order-service/src/main/java/***/example/order/rabbitmq/RabbitMQProducer.java
@Slf4j
@***ponent
public class RabbitMQProducer implements
RabbitTemplate.ConfirmCallback, // 确认回调
RabbitTemplate.ReturnsCallback { // 退回回调
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MessageLogMapper messageLogMapper;
@PostConstruct
public void init() {
// 设置回调
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
/**
* 发送消息到 RabbitMQ
*/
public void sendMessage(String exchange, String routingKey, Object message) {
try {
// 1. 生成消息唯一ID
String messageId = UUID.randomUUID().toString();
// 2. 保存消息记录到数据库(用于消息追踪)
MessageLog messageLog = new MessageLog();
messageLog.setMessageId(messageId);
messageLog.setContent(JSON.toJSONString(message));
messageLog.setExchange(exchange);
messageLog.setRoutingKey(routingKey);
messageLog.setStatus(0); // 发送中
messageLogMapper.insert(messageLog);
// 3. 发送消息
CorrelationData correlationData = new CorrelationData(messageId);
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
log.info("【RabbitMQ生产者】发送消息 => messageId: {}, exchange: {}, routingKey: {}",
messageId, exchange, routingKey);
} catch (Exception e) {
log.error("【RabbitMQ生产者】发送消息失败", e);
throw new RuntimeException("发送消息失败", e);
}
}
/**
* 消息确认回调
* 确认消息是否到达 Exchange
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String messageId = correlationData.getId();
if (ack) {
// ✅ 消息成功到达交换机
log.info("【RabbitMQ生产者】消息确认成功 => messageId: {}", messageId);
// 更新消息状态为已发送
MessageLog messageLog = messageLogMapper.selectByMessageId(messageId);
if (messageLog != null) {
messageLog.setStatus(1);
messageLogMapper.updateById(messageLog);
}
} else {
// ❌ 消息未到达交换机
log.error("【RabbitMQ生产者】消息确认失败 => messageId: {}, cause: {}",
messageId, cause);
// 更新消息状态为发送失败
MessageLog messageLog = messageLogMapper.selectByMessageId(messageId);
if (messageLog != null) {
messageLog.setStatus(2);
messageLog.setRetryCount(messageLog.getRetryCount() + 1);
messageLogMapper.updateById(messageLog);
}
// TODO: 实现重试逻辑
}
}
/**
* 消息退回回调
* 当消息从交换机路由到队列失败时触发
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("【RabbitMQ生产者】消息退回 => message: {}, replyCode: {}, replyText: {}, exchange: {}, routingKey: {}",
new String(returned.getMessage().getBody()),
returned.getReplyCode(),
returned.getReplyText(),
returned.getExchange(),
returned.getRoutingKey());
// TODO: 实现补偿逻辑
}
}
5.3 在业务代码中使用
位置: order-service/src/main/java/***/example/order/service/OrderService.java
@Service
public class OrderService {
@Autowired
private RabbitMQProducer rabbitMQProducer;
@Value("${rabbitmq.inventory.exchange}")
private String inventoryExchange;
@Value("${rabbitmq.inventory.routing-key}")
private String inventoryRoutingKey;
@Transactional
public Order createOrder(OrderCreateDTO dto) {
// 1. 生成订单编号
String orderNo = "ORD" + IdUtil.getSnowflakeNextIdStr();
// 2. 创建订单对象
Order order = new Order();
order.setOrderNo(orderNo);
order.setUserId(dto.getUserId());
order.setProductId(dto.getProductId());
order.setQuantity(dto.getQuantity());
// ... 设置其他字段
// 3. 保存订单到数据库
save(order);
log.info("【订单服务】创建订单成功 => orderNo: {}", orderNo);
// 4. 发送库存扣减消息
InventoryReduceDTO inventoryMessage = new InventoryReduceDTO(
orderNo,
dto.getProductId(),
dto.getQuantity()
);
rabbitMQProducer.sendMessage(
inventoryExchange,
inventoryRoutingKey,
inventoryMessage
);
// 5. 发送订单通知消息
OrderNotificationDTO notificationMessage = new OrderNotificationDTO(
orderNo,
dto.getUserId(),
productName,
dto.getQuantity(),
totalAmount,
1 // 通知类型:订单创建
);
rabbitMQProducer.sendMessage(
notificationExchange,
notificationRoutingKey,
notificationMessage
);
return order;
}
}
6. 消息消费者实现
6.1 库存服务消费者
位置: inventory-service/src/main/java/***/example/inventory/rabbitmq/InventoryConsumer.java
@Slf4j
@***ponent
public class InventoryConsumer {
@Autowired
private InventoryService inventoryService;
/**
* 监听库存队列
* 处理库存扣减消息
*/
@RabbitListener(queues = "${rabbitmq.inventory.queue}")
public void handleInventoryReduce(Message message, Channel channel)
throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 1. 获取消息内容
String messageBody = new String(message.getBody());
log.info("【RabbitMQ消费者】接收到库存扣减消息 => {}", messageBody);
// 2. 解析消息
InventoryReduceDTO dto = JSON.parseObject(
messageBody,
InventoryReduceDTO.class
);
// 3. 处理业务逻辑 - 扣减库存
inventoryService.reduceInventory(
dto.getOrderNo(),
dto.getProductId(),
dto.getQuantity()
);
// 4. 手动确认消息(ACK)
// 参数1:deliveryTag 消息标签
// 参数2:multiple 是否批量确认
channel.basicAck(deliveryTag, false);
log.info("【RabbitMQ消费者】消息处理成功,已确认 => orderNo: {}",
dto.getOrderNo());
} catch (Exception e) {
log.error("【RabbitMQ消费者】消息处理失败 => deliveryTag: {}, error: {}",
deliveryTag, e.getMessage(), e);
// 判断是否已经重试过
Boolean redelivered = message.getMessageProperties().getRedelivered();
if (redelivered) {
// 已经重试过,拒绝消息并不重新入队(进入死信队列)
// 参数1:deliveryTag
// 参数2:multiple 是否批量
// 参数3:requeue 是否重新入队
channel.basi***ack(deliveryTag, false, false);
log.error("【RabbitMQ消费者】消息已重试,拒绝并进入死信队列 => deliveryTag: {}",
deliveryTag);
} else {
// 第一次失败,拒绝消息并重新入队进行重试
channel.basi***ack(deliveryTag, false, true);
log.warn("【RabbitMQ消费者】消息处理失败,重新入队重试 => deliveryTag: {}",
deliveryTag);
}
}
}
/**
* 监听死信队列
* 处理多次重试失败的消息
*/
@RabbitListener(queues = "dead-letter.queue")
public void handleDeadLetter(Message message, Channel channel)
throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
String messageBody = new String(message.getBody());
log.error("【RabbitMQ死信队列】接收到死信消息 => {}", messageBody);
// 这里可以:
// 1. 保存到数据库,由人工处理
// 2. 发送告警通知
// 3. 记录到日志文件
// 确认消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("【RabbitMQ死信队列】处理失败", e);
channel.basi***ack(deliveryTag, false, false);
}
}
}
6.2 通知服务消费者
位置: notification-service/src/main/java/***/example/notification/rabbitmq/NotificationConsumer.java
@Slf4j
@***ponent
public class NotificationConsumer {
@Autowired
private NotificationService notificationService;
/**
* 监听通知队列
*/
@RabbitListener(queues = "${rabbitmq.notification.queue}")
public void handleOrderNotification(Message message, Channel channel)
throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 1. 获取消息内容
String messageBody = new String(message.getBody());
log.info("【RabbitMQ消费者】接收到订单通知消息 => {}", messageBody);
// 2. 解析消息
OrderNotificationDTO dto = JSON.parseObject(
messageBody,
OrderNotificationDTO.class
);
// 3. 构建通知内容
String content = String.format(
"尊敬的用户,您的订单 %s 已创建成功!\n" +
"商品:%s\n" +
"数量:%d\n" +
"金额:¥%.2f\n" +
"感谢您的购买!",
dto.getOrderNo(),
dto.getProductName(),
dto.getQuantity(),
dto.getTotalAmount()
);
// 4. 发送通知
notificationService.sendNotification(
dto.getOrderNo(),
dto.getUserId(),
content,
dto.getType()
);
// 5. 手动确认消息
channel.basicAck(deliveryTag, false);
log.info("【RabbitMQ消费者】通知发送成功 => orderNo: {}",
dto.getOrderNo());
} catch (Exception e) {
log.error("【RabbitMQ消费者】处理通知消息失败 => deliveryTag: {}",
deliveryTag, e);
// 通知发送失败通常不需要重试,直接拒绝
channel.basi***ack(deliveryTag, false, false);
}
}
}
6.3 消费者配置
位置: inventory-service/src/main/resources/application.yml
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin123
virtual-host: /
listener:
simple:
# 手动确认消息
acknowledge-mode: manual
# 消费者最小数量
concurrency: 1
# 消费者最大数量
max-concurrency: 10
# 限流:每次只处理一条消息
prefetch: 1
# 是否支持重试
retry:
enabled: true
# 最大重试次数
max-attempts: 3
# 重试间隔(毫秒)
initial-interval: 3000
rabbitmq:
inventory:
queue: inventory.queue
7. 消息可靠性保证
7.1 生产者确认(Publisher Confirm)
问题: 如何确保消息成功发送到 RabbitMQ?
解决方案: 开启生产者确认机制
配置
spring:
rabbitmq:
# 开启生产者确认
publisher-confirm-type: correlated # SIMPLE | CORRELATED | NONE
# 开启生产者退回
publisher-returns: true
template:
# 消息路由失败时是否退回
mandatory: true
实现确认回调
@***ponent
public class RabbitMQProducer implements RabbitTemplate.ConfirmCallback {
/**
* 消息是否成功到达 Exchange
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String messageId = correlationData.getId();
if (ack) {
// ✅ 成功到达交换机
log.info("消息发送成功 => messageId: {}", messageId);
// 更新数据库:消息状态 = 已发送
} else {
// ❌ 未到达交换机
log.error("消息发送失败 => messageId: {}, cause: {}", messageId, cause);
// 更新数据库:消息状态 = 发送失败
// 实现重试逻辑
}
}
}
实现退回回调
@***ponent
public class RabbitMQProducer implements RabbitTemplate.ReturnsCallback {
/**
* 消息从 Exchange 路由到 Queue 失败时触发
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("消息退回 => message: {}, replyCode: {}, replyText: {}, exchange: {}, routingKey: {}",
new String(returned.getMessage().getBody()),
returned.getReplyCode(),
returned.getReplyText(),
returned.getExchange(),
returned.getRoutingKey());
// 实现补偿逻辑:
// 1. 记录到数据库
// 2. 发送告警
// 3. 人工处理
}
}
消息发送流程图:
生产者发送消息
↓
消息到达 Exchange?
├─ YES → ConfirmCallback(ack=true)
└─ NO → ConfirmCallback(ack=false)
↓
消息路由到 Queue?
├─ YES → 正常消费
└─ NO → ReturnsCallback(退回)
7.2 消息持久化
问题: RabbitMQ 宕机后,消息会丢失吗?
解决方案: 持久化 Exchange、Queue 和 Message
Exchange 持久化
@Bean
public DirectExchange inventoryExchange() {
// 参数2:durable = true,持久化
return new DirectExchange("inventory.exchange", true, false);
}
Queue 持久化
@Bean
public Queue inventoryQueue() {
// durable() 方法创建持久化队列
return QueueBuilder.durable("inventory.queue").build();
}
Message 持久化
// Spring AMQP 默认会将消息设置为持久化
// 消息的 deliveryMode = 2 (PERSISTENT)
rabbitTemplate.convertAndSend(exchange, routingKey, message);
// 如需手动设置:
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return msg;
});
持久化效果:
| 持久化级别 | RabbitMQ 重启后 |
|---|---|
| Exchange 持久化 | Exchange 保留 |
| Queue 持久化 | Queue 保留 |
| Message 持久化 | 消息保留 |
| 都不持久化 | 全部丢失 ❌ |
7.3 消费者确认(ACK)
问题: 如何确保消息被成功消费?
解决方案: 手动确认消息
配置
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 手动确认
ACK 模式对比
| 模式 | 说明 | 优点 | 缺点 | 使用场景 |
|---|---|---|---|---|
| auto | 消息一旦被接收就自动确认 | 简单 | 可能丢消息 | 允许丢失的场景 |
| manual | 需要手动调用 ACK | 可靠 | 代码复杂 | 生产环境 ✅ |
| none | 不确认 | - | 不推荐 | 不推荐使用 |
手动确认代码
@RabbitListener(queues = "inventory.queue")
public void handleMessage(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 处理业务逻辑
processMessage(message);
// ✅ 确认消息(ACK)
// 参数1:deliveryTag 消息标签
// 参数2:multiple 是否批量确认
channel.basicAck(deliveryTag, false);
} catch (BusinessException e) {
// ❌ 业务异常,拒绝消息并不重新入队
// 参数1:deliveryTag
// 参数2:multiple 是否批量
// 参数3:requeue 是否重新入队
channel.basi***ack(deliveryTag, false, false);
} catch (Exception e) {
// ⚠️ 系统异常,拒绝消息并重新入队重试
channel.basi***ack(deliveryTag, false, true);
}
}
三种确认方式
// 1. basicAck - 确认消息
channel.basicAck(deliveryTag, false);
// 2. basi***ack - 拒绝消息(可批量,可重新入队)
channel.basi***ack(deliveryTag, false, true);
// 3. basicReject - 拒绝消息(不可批量)
channel.basicReject(deliveryTag, false);
消费流程图:
消费者接收消息
↓
处理业务逻辑
├─ 成功 → channel.basicAck() → 消息从队列删除
│
├─ 失败(首次)→ channel.basi***ack(requeue=true) → 重新入队
│
└─ 失败(重试后)→ channel.basi***ack(requeue=false) → 死信队列
8. 死信队列实战
8.1 什么是死信队列?
死信(Dead Letter): 无法被正常消费的消息
产生死信的情况:
- 消息被拒绝(basi***ack/basicReject)且 requeue=false
- 消息过期(TTL 超时)
- 队列达到最大长度
死信队列的作用:
- 保存处理失败的消息
- 防止消息丢失
- 方便人工介入处理
8.2 配置死信队列
创建死信交换机和队列
// 位置:order-service/.../RabbitMQConfig.java
// 1. 创建死信交换机
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("dead-letter.exchange", true, false);
}
// 2. 创建死信队列
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("dead-letter.queue").build();
}
// 3. 绑定死信队列到死信交换机
@Bean
public Binding deadLetterBinding() {
return BindingBuilder
.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("dead-letter"); // routing key
}
业务队列配置死信交换机
@Bean
public Queue inventoryQueue() {
return QueueBuilder.durable("inventory.queue")
// 配置死信交换机
.deadLetterExchange("dead-letter.exchange")
// 配置死信路由键
.deadLetterRoutingKey("dead-letter")
// 可选:设置消息 TTL(30秒)
.ttl(30000)
// 可选:设置队列最大长度
.maxLength(10000)
.build();
}
8.3 死信流转过程
普通消息流程:
Producer → Exchange → Queue → Consumer (成功)
死信流程:
Producer → Exchange → Queue → Consumer (失败)
↓
basi***ack(requeue=false)
↓
Dead Letter Exchange
↓
Dead Letter Queue
↓
Dead Letter Consumer
8.4 监听死信队列
// 位置:inventory-service/.../InventoryConsumer.java
@RabbitListener(queues = "dead-letter.queue")
public void handleDeadLetter(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
String messageBody = new String(message.getBody());
log.error("【死信队列】接收到死信消息 => {}", messageBody);
// 获取死信原因
Map<String, Object> headers = message.getMessageProperties().getHeaders();
String reason = (String) headers.get("x-first-death-reason");
String queue = (String) headers.get("x-first-death-queue");
log.error("死信原因:{}, 原队列:{}", reason, queue);
// 处理死信消息:
// 1. 保存到数据库,标记为失败
saveToDeadLetterTable(messageBody, reason);
// 2. 发送告警通知
sendAlert("检测到死信消息", messageBody);
// 3. 记录到日志文件
logToFile(messageBody);
// 确认消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("【死信队列】处理失败", e);
// 死信队列的消息如果再次失败,直接拒绝(不再重试)
channel.basi***ack(deliveryTag, false, false);
}
}
8.5 在 RabbitMQ 管理界面查看
步骤:
- 访问 http://localhost:15672
- 登录(admin/admin123)
- 点击 “Queues” 标签
- 查看
dead-letter.queue - 可以看到死信消息的数量和详情
9. 消息幂等性处理
9.1 什么是消息幂等性?
幂等性: 同一个消息被消费多次,结果应该是一样的
为什么会重复消费?
- 生产者重试导致消息重复发送
- 消费者处理完成但 ACK 失败,导致消息重新入队
- 网络抖动导致消息重发
不做幂等性处理的后果:
// ❌ 不幂等:库存重复扣减
订单A 第1次消费:库存 100 → 99
订单A 第2次消费(重复):库存 99 → 98 // 库存被重复扣减!
```
9.2 幂等性实现方案
方案1:唯一消息ID + Redis
原理: 使用 Redis 记录已处理的消息ID
实现步骤:
// 位置:inventory-service/.../InventoryConsumer.java
@Autowired
private StringRedisTemplate redisTemplate;
@RabbitListener(queues = "${rabbitmq.inventory.queue}")
public void handleInventoryReduce(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 1. 获取消息ID(由生产者生成)
String messageId = message.getMessageProperties().getMessageId();
// 2. 检查消息是否已处理(Redis)
String key = "msg:consumed:" + messageId;
Boolean isProcessed = redisTemplate.opsForValue().setIfAbsent(
key,
"1",
24,
TimeUnit.HOURS
);
if (Boolean.FALSE.equals(isProcessed)) {
// 消息已处理,直接确认
log.warn("【幂等性】消息已处理,跳过 => messageId: {}", messageId);
channel.basicAck(deliveryTag, false);
return;
}
// 3. 解析消息
InventoryReduceDTO dto = JSON.parseObject(
new String(message.getBody()),
InventoryReduceDTO.class
);
// 4. 执行业务逻辑
inventoryService.reduceInventory(
dto.getOrderNo(),
dto.getProductId(),
dto.getQuantity()
);
// 5. 确认消息
channel.basicAck(deliveryTag, false);
log.info("【幂等性】消息处理成功 => messageId: {}", messageId);
} catch (Exception e) {
// 处理失败,删除 Redis 标记,允许重试
String key = "msg:consumed:" + message.getMessageProperties().getMessageId();
redisTemplate.delete(key);
// 拒绝消息
channel.basi***ack(deliveryTag, false, true);
}
}
流程图:
接收消息
↓
检查 Redis 中是否存在 messageId
├─ 存在 → 消息已处理 → 直接ACK
└─ 不存在 → 写入 Redis → 执行业务 → ACK
方案2:数据库唯一约束
原理: 利用数据库的唯一索引保证幂等性
数据库设计:
-- 库存流水表
CREATE TABLE t_inventory_log (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_no VARCHAR(50) NOT NULL UNIQUE, -- 订单号唯一索引 ⭐
product_id BIGINT NOT NULL,
quantity INT NOT NULL,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP
);
业务代码:
@Transactional
public void reduceInventory(String orderNo, Long productId, Integer quantity) {
// 1. 检查是否已处理
InventoryLog existLog = inventoryLogMapper.selectByOrderNo(orderNo);
if (existLog != null) {
log.warn("【幂等性】订单已处理 => orderNo: {}", orderNo);
return;
}
// 2. 扣减库存(乐观锁)
int updated = inventoryMapper.reduceStock(productId, quantity);
if (updated == 0) {
throw new BusinessException("库存不足");
}
// 3. 记录流水(唯一约束保证幂等)
InventoryLog log = new InventoryLog();
log.setOrderNo(orderNo); // 唯一索引,重复插入会失败
log.setProductId(productId);
log.setQuantity(quantity);
try {
inventoryLogMapper.insert(log);
} catch (DuplicateKeyException e) {
// 重复订单,说明已处理
log.warn("【幂等性】检测到重复订单 => orderNo: {}", orderNo);
throw new BusinessException("订单已处理");
}
}
方案3:业务逻辑天然幂等
适用场景: 某些业务操作本身就是幂等的
示例:
// ✅ 天然幂等:更新订单状态
// 多次执行结果一样
UPDATE t_order SET status = 2 WHERE order_no = 'ORD123456';
// ✅ 天然幂等:查询操作
SELECT * FROM t_order WHERE order_no = 'ORD123456';
// ❌ 非幂等:扣减库存(每次执行结果不同)
UPDATE t_inventory SET stock = stock - 1 WHERE product_id = 1001;
9.3 我们项目中的幂等性实现
库存服务: Redis + 数据库唯一约束
// 1. Redis 快速判重
String key = "order:processed:" + orderNo;
if (!redisTemplate.opsForValue().setIfAbsent(key, "1", 24, TimeUnit.HOURS)) {
return; // 已处理
}
// 2. 数据库唯一索引兜底
try {
inventoryLogMapper.insert(log); // order_no 唯一索引
} catch (DuplicateKeyException e) {
// 重复订单
}
通知服务: 允许重复发送(业务上可接受)
// 通知类消息通常允许重复
// 发送多次邮件/短信对业务影响不大
// 可以不做幂等性处理
10. 性能优化技巧
10.1 消费者并发配置
问题: 单个消费者处理速度慢,消息堆积
解决方案: 增加消费者并发数
spring:
rabbitmq:
listener:
simple:
# 最小消费者数量
concurrency: 3
# 最大消费者数量
max-concurrency: 10
# 每个消费者一次拉取的消息数
prefetch: 1
配置说明:
| 参数 | 说明 | 推荐值 |
|---|---|---|
concurrency |
初始消费者线程数 | CPU核心数 |
max-concurrency |
最大消费者线程数 | CPU核心数 * 2 |
prefetch |
每次拉取消息数(限流) | 1-10 |
效果对比:
单消费者:
Queue [1000条消息] → Consumer (1个) → 处理时间: 100秒
并发消费者:
Queue [1000条消息] → Consumer (10个) → 处理时间: 10秒
10.2 批量确认消息
问题: 每条消息单独 ACK,网络开销大
解决方案: 批量确认
@RabbitListener(queues = "inventory.queue")
public void handleBatch(List<Message> messages, Channel channel) throws IOException {
try {
// 批量处理消息
for (Message message : messages) {
processMessage(message);
}
// 批量确认(multiple = true)
long deliveryTag = messages.get(messages.size() - 1)
.getMessageProperties()
.getDeliveryTag();
channel.basicAck(deliveryTag, true); // true = 批量确认
} catch (Exception e) {
// 批量拒绝
channel.basi***ack(deliveryTag, true, true);
}
}
注意事项:
- ⚠️ 批量确认时,如果有一条失败,可能导致所有消息重新消费
- ✅ 适合消息处理速度快、失败率低的场景
10.3 消息预取(Prefetch)
配置:
spring:
rabbitmq:
listener:
simple:
prefetch: 10 # 每个消费者预取10条消息
原理:
prefetch = 1(默认):
Consumer → 拉取1条 → 处理 → ACK → 拉取1条 → 处理 → ACK
↑_______________| ↑_______________|
网络往返 网络往返
prefetch = 10:
Consumer → 拉取10条 → 处理10条 → ACK → 拉取10条
↑_____________________________|
减少网络往返次数
建议值:
- 消息处理速度快:
prefetch = 10-100 - 消息处理速度慢:
prefetch = 1-5 - 需要严格顺序:
prefetch = 1
10.4 连接池优化
配置:
spring:
rabbitmq:
cache:
channel:
# Channel 缓存大小
size: 50
# 最大 Channel 数
checkout-timeout: 30000
10.5 使用延迟队列优化
场景: 订单15分钟未支付自动取消
传统方式(轮询):
// ❌ 每分钟查询数据库,效率低
@Scheduled(cron = "0 * * * * ?")
public void checkUnpaidOrders() {
List<Order> orders = orderMapper.selectUnpaid();
for (Order order : orders) {
if (order.isTimeout()) {
cancelOrder(order);
}
}
}
RabbitMQ 延迟队列:
// ✅ 发送延迟消息,15分钟后自动消费
@Bean
public Queue orderTimeoutQueue() {
return QueueBuilder.durable("order.timeout.queue")
.ttl(900000) // 15分钟
.deadLetterExchange("order.cancel.exchange")
.deadLetterRoutingKey("order.cancel")
.build();
}
// 创建订单时发送延迟消息
rabbitTemplate.convertAndSend("order.timeout.queue", orderNo);
// 15分钟后自动消费
@RabbitListener(queues = "order.cancel.queue")
public void handleOrderTimeout(String orderNo) {
// 检查订单状态,未支付则取消
cancelOrderIfUnpaid(orderNo);
}
10.6 消息压缩
场景: 消息体很大(如订单详情包含大量商品信息)
// 发送前压缩
public void send***pressedMessage(String exchange, String routingKey, Object message) {
String json = JSON.toJSONString(message);
byte[] ***pressed = GzipUtil.***press(json);
rabbitTemplate.convertAndSend(exchange, routingKey, ***pressed, msg -> {
msg.getMessageProperties().setContentEncoding("gzip");
return msg;
});
}
// 接收后解压
@RabbitListener(queues = "large.message.queue")
public void handle***pressed(Message message) {
String encoding = message.getMessageProperties().getContentEncoding();
byte[] body = message.getBody();
if ("gzip".equals(encoding)) {
body = GzipUtil.de***press(body);
}
String json = new String(body);
// 处理消息...
}
11. 监控和调试
11.1 RabbitMQ 管理界面
访问地址: http://localhost:15672
主要功能:
Overview(概览)
- 📊 消息速率(发送/接收)
- 🔗 连接数、通道数
- 📮 队列数、交换机数
Connections(连接)
- 查看所有客户端连接
- 查看连接的IP、端口、用户
- 可手动关闭连接
Channels(通道)
- 每个连接的通道列表
- 查看通道状态(idle/running)
- 消息确认数、未确认数
Exchanges(交换机)
- 所有交换机列表
- 查看交换机类型、绑定关系
- 可手动发送测试消息
Queues(队列)⭐ 重点
- 队列消息数量(Ready、Unacked、Total)
- 消息速率(in***ing、deliver)
- 消费者数量
- 可查看队列中的消息内容
实用操作:
1. 查看队列消息:
Queues → 点击队列名 → Get messages → Get Message(s)
2. 手动发送消息:
Exchanges → 点击交换机 → Publish message → 输入内容 → Publish message
3. 清空队列:
Queues → 点击队列名 → Purge Messages
4. 删除队列:
Queues → 点击队列名 → Delete
11.2 应用日志监控
在我们项目中添加详细日志:
@Slf4j
@***ponent
public class RabbitMQProducer {
public void sendMessage(String exchange, String routingKey, Object message) {
String messageId = UUID.randomUUID().toString();
log.info("【发送消息】messageId: {}, exchange: {}, routingKey: {}, content: {}",
messageId, exchange, routingKey, JSON.toJSONString(message));
rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("【消息确认】成功 => messageId: {}", correlationData.getId());
} else {
log.error("【消息确认】失败 => messageId: {}, cause: {}",
correlationData.getId(), cause);
}
}
}
@Slf4j
@***ponent
public class InventoryConsumer {
@RabbitListener(queues = "inventory.queue")
public void handleMessage(Message message, Channel channel) {
String messageId = message.getMessageProperties().getMessageId();
log.info("【接收消息】messageId: {}, content: {}",
messageId, new String(message.getBody()));
try {
// 处理消息
processMessage(message);
channel.basicAck(deliveryTag, false);
log.info("【消息处理】成功 => messageId: {}", messageId);
} catch (Exception e) {
log.error("【消息处理】失败 => messageId: {}, error: {}",
messageId, e.getMessage(), e);
channel.basi***ack(deliveryTag, false, true);
}
}
}
11.3 监控指标
关键指标:
| 指标 | 说明 | 正常值 | 告警阈值 |
|---|---|---|---|
| Queue Ready | 队列中待消费消息 | < 100 | > 1000 |
| Queue Unacked | 未确认消息数 | < 10 | > 100 |
| Consumer Count | 消费者数量 | > 0 | = 0(无消费者) |
| Publish Rate | 消息发送速率 | 稳定 | 突然激增 |
| Deliver Rate | 消息消费速率 | 稳定 | 为0(消费停滞) |
| Memory Used | 内存使用 | < 80% | > 90% |
11.4 常用调试命令
Docker 环境查看 RabbitMQ 日志:
# 查看 RabbitMQ 容器日志
docker logs rabbitmq-demo
# 实时查看日志
docker logs -f rabbitmq-demo
# 查看最后100行
docker logs --tail 100 rabbitmq-demo
进入 RabbitMQ 容器:
# 进入容器
docker exec -it rabbitmq-demo bash
# 查看队列列表
rabbitmqctl list_queues
# 查看交换机列表
rabbitmqctl list_exchanges
# 查看绑定关系
rabbitmqctl list_bindings
# 查看连接
rabbitmqctl list_connections
# 查看消费者
rabbitmqctl list_consumers
12. 常见问题
12.1 消息丢失
问题描述: 发送的消息没有被消费
排查步骤:
1. 检查消息是否发送成功
// 查看生产者日志
2025-10-22 10:00:00 INFO 【RabbitMQ生产者】发送消息 => messageId: xxx
2025-10-22 10:00:00 INFO 【RabbitMQ生产者】消息确认成功 => messageId: xxx
2. 检查 RabbitMQ 管理界面
- Queues → 查看队列消息数量
- 如果 Ready = 0,说明消息已被消费或未到达队列
3. 检查路由配置
// 检查 exchange、routing-key 是否正确
rabbitTemplate.convertAndSend(
"inventory.exchange", // ✅ 正确的交换机名
"inventory.reduce", // ✅ 正确的 routing key
message
);
4. 检查消费者
# 查看消费者数量
rabbitmqctl list_consumers
# 如果消费者数量 = 0,说明消费者没有启动
5. 检查消费者日志
// 是否有接收消息的日志
2025-10-22 10:00:01 INFO 【RabbitMQ消费者】接收到消息 => xxx
12.2 消息堆积
问题描述: 队列中消息越来越多,消费不过来
原因分析:
- 消费者处理速度慢
- 消费者数量不足
- 消费者宕机
解决方案:
1. 增加消费者并发数
spring:
rabbitmq:
listener:
simple:
concurrency: 10 # 增加到10个消费者
max-concurrency: 20
2. 优化业务代码
// ❌ 慢:每次查询数据库
for (Item item : items) {
Product product = productMapper.selectById(item.getProductId());
}
// ✅ 快:批量查询
List<Long> productIds = items.stream()
.map(Item::getProductId)
.collect(Collectors.toList());
List<Product> products = productMapper.selectByIds(productIds);
3. 水平扩展(增加服务实例)
# 启动多个消费者实例
java -jar inventory-service.jar --server.port=8082
java -jar inventory-service.jar --server.port=8092
java -jar inventory-service.jar --server.port=8102
12.3 重复消费
问题描述: 同一条消息被消费多次
原因:
- 消费者处理完成但 ACK 失败
- 网络抖动导致超时重发
解决方案: 实现幂等性(见第9章)
12.4 消息顺序问题
问题描述: 消息乱序消费
原因:
- 多个消费者并发消费
- 消息重试导致顺序混乱
解决方案:
1. 单消费者 + prefetch=1
spring:
rabbitmq:
listener:
simple:
concurrency: 1 # 只有1个消费者
prefetch: 1 # 每次只拉取1条
2. 使用消息优先级
@Bean
public Queue priorityQueue() {
return QueueBuilder.durable("order.queue")
.maxPriority(10) // 设置最大优先级
.build();
}
// 发送消息时设置优先级
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
msg.getMessageProperties().setPriority(5); // 优先级 0-10
return msg;
});
3. 业务层保证顺序
// 使用版本号
UPDATE t_order SET status = 2, version = version + 1
WHERE order_no = 'ORD123' AND version = 1;
12.5 连接失败
问题描述:
Caused by: java.***.ConnectException: Connection refused
排查步骤:
1. 检查 RabbitMQ 是否启动
docker ps | grep rabbitmq
# 如果没有,启动容器
docker-***pose up -d
2. 检查端口是否开放
# Windows
***stat -ano | findstr 5672
# Linux
***stat -tunlp | grep 5672
3. 检查配置
spring:
rabbitmq:
host: localhost # 确保正确
port: 5672 # 确保正确
username: admin
password: admin123
4. 防火墙问题
# 关闭防火墙(仅测试环境)
# Windows: 控制面板 → 防火墙 → 关闭
# Linux: systemctl stop firewalld
12.6 队列未创建
问题描述:
Caused by: ShutdownSignalException: channel error;
protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND)
原因: 队列不存在
解决方案:
1. 确保 RabbitAdmin Bean 存在
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
2. 先启动生产者(声明队列),再启动消费者
启动顺序:
1. order-service (生产者,声明队列)
2. inventory-service (消费者)
3. notification-service (消费者)
3. 手动创建队列
- 访问 http://localhost:15672
- Queues → Add a new queue
- 输入队列名、配置参数
- Add queue
13. 总结与展望
13.1 学习总结
通过这个项目,我们学习了:
✅ RabbitMQ 核心概念
- Exchange、Queue、Binding
- 四种交换机类型(Direct、Topic、Fanout、Headers)
- 生产者、消费者模型
✅ Spring Boot 集成 RabbitMQ
- 配置 RabbitMQ 连接
- 声明 Exchange、Queue、Binding
- 使用
RabbitTemplate发送消息 - 使用
@RabbitListener接收消息
✅ 消息可靠性保证
- 生产者确认(Publisher Confirm)
- 生产者退回(Publisher Return)
- 消息持久化(Exchange、Queue、Message)
- 消费者手动确认(ACK/NACK)
✅ 高级特性
- 死信队列(处理失败消息)
- 消息幂等性(防止重复消费)
- 延迟队列(定时任务)
- 消息优先级
✅ 性能优化
- 消费者并发配置
- 批量确认消息
- 消息预取(Prefetch)
- 连接池优化
✅ 监控调试
- RabbitMQ 管理界面
- 应用日志监控
- 常见问题排查
13.2 项目亮点
🎯 完整的业务场景
- 订单、库存、通知三大核心服务
- 真实的电商业务流程
- 前后端完整实现
🐳 一键启动环境
- Docker ***pose 管理基础设施
- 无需手动安装 RabbitMQ、MySQL、Redis
- 数据库自动初始化
📦 生产级代码
- 消息确认、重试、死信队列
- 幂等性处理、事务保证
- 详细的日志和异常处理
📚 详细的文档
- 启动指南
- Docker 配置详解
- 学习笔记(本文档)