RabbitMQ 实战教程:从零构建微服务电商订单系统

RabbitMQ 实战教程:从零构建微服务电商订单系统

RabbitMQ 实战教程:从零构建微服务电商订单系统


📌 摘要:本教程通过构建一个完整的电商订单微服务系统,带你从零掌握 RabbitMQ 在 Spring Boot 项目中的实战应用。 项目采用 Spring Boot 2.7 + Spring AMQP + MyBatis-Plus + MySQL 8.0 + Redis 构建后端微服务架构(订单、库存、通知、网关四大服务),前端使用 Vue3 + Element Plus 提供可视化操作界面,基础设施通过 Docker ***pose 一键部署(RabbitMQ 3.12 + MySQL + Redis)。教程涵盖 RabbitMQ 核心概念、Direct/Topic/Fanout 三种交换机实战、生产者确认与退回机制、消息持久化、消费者手动 ACK、死信队列配置、消息幂等性处理(基于 Redis 去重)、消息日志追踪等生产级特性。

🎯 本教程的独特优势

其他教程 本教程
❌ 只讲理论概念 理论 + 完整项目实战
❌ 简单的 Hello World 真实的微服务电商系统
❌ 代码片段零散 完整可运行的代码仓库
❌ 缺少环境搭建 Docker 一键启动全部基础设施
❌ 没有前端界面 Vue3 可视化操作界面
❌ 不讲消息可靠性 生产者确认、消费者确认、死信队列全覆盖
❌ 缺少实战场景 订单、库存、通知真实业务场景
❌ 看完还是不会用 跟着教程走一遍,立即上手实战

💡 你将学到什么

基础篇

  • ✅ RabbitMQ 核心概念(Exchange、Queue、Binding)
  • ✅ 四种交换机类型的实际应用
  • ✅ 消息生产者和消费者的实现

进阶篇

  • ✅ 消息可靠性保证(生产者确认、消息持久化、消费者确认)
  • ✅ 死信队列的配置和使用
  • ✅ 消息幂等性处理(防止重复消费)

实战篇

  • ✅ 完整的微服务项目(订单、库存、通知、网关)
  • ✅ Spring Boot 集成 RabbitMQ 最佳实践
  • ✅ Docker ***pose 一键部署
  • ✅ 前后端完整代码

🎁 附赠内容

  • 📦 完整源码:开箱即用的 Maven + Spring Boot 项目
  • 🐳 Docker 环境:RabbitMQ + MySQL + Redis 一键启动
  • 🎨 前端界面:Vue3 + Element Plus 管理界面
  • 📚 详细文档:启动指南、Docker 配置详解、学习笔记
  • 💬 代码注释:每个关键点都有详细注释

🚀 学完后你能做什么

  • ✅ 在自己的项目中应用 RabbitMQ
  • ✅ 设计高可用的消息队列架构
  • ✅ 解决消息丢失、重复消费等问题
  • ✅ 面试时自信地讲解 RabbitMQ 原理和实践
  • ✅ 快速上手 Kafka、RocketMQ 等其他消息队列

📖 目录

  1. 项目介绍
  2. 快速开始
  3. RabbitMQ 核心概念
  4. 项目架构详解
  5. 消息生产者实现
  6. 消息消费者实现
  7. 消息可靠性保证
  8. 死信队列实战
  9. 消息幂等性处理
  10. 性能优化技巧
  11. 监控和调试
  12. 常见问题
  13. 总结与展望

1. 项目介绍

1.1 业务场景

我们构建了一个电商订单系统,包含以下核心流程:

用户创建订单
    ↓
订单服务保存订单 → 发送消息到 RabbitMQ
    ↓                      ↓
返回成功            ┌──────┴──────┐
                    ↓              ↓
            库存服务扣减库存   通知服务发送通知
              (异步)            (异步)

为什么用 RabbitMQ?

传统同步调用的问题:

// ❌ 传统方式:串行调用,响应慢
orderService.createOrder(order);
inventoryService.reduceStock(order);     // 等待库存服务
notificationService.sendNotify(order);   // 等待通知服务
// 总耗时 = 订单服务 + 库存服务 + 通知服务

使用 RabbitMQ 后:

// ✅ 异步方式:发完消息就返回
orderService.createOrder(order);
rabbitMQProducer.sendMessage("inventory", orderMessage);
rabbitMQProducer.sendMessage("notification", orderMessage);
// 总耗时 = 订单服务 + 发送消息(几毫秒)

1.2 技术栈

后端:

  • Spring Boot 2.7.18
  • Spring AMQP(RabbitMQ)
  • MyBatis-Plus
  • MySQL 8.0
  • Redis

前端:

  • Vue 3
  • Element Plus
  • Axios

基础设施:

  • RabbitMQ 3.12(Docker)
  • MySQL 8.0(Docker)
  • Redis 7(Docker)

1.3 项目结构

rabbitmq-microservice-demo/
├── order-service/              # 订单服务(生产者)
│   └── rabbitmq/
│       ├── RabbitMQConfig.java     # RabbitMQ 配置
│       └── RabbitMQProducer.java   # 消息发送
├── inventory-service/          # 库存服务(消费者)
│   └── rabbitmq/
│       └── InventoryConsumer.java  # 消息接收
├── notification-service/       # 通知服务(消费者)
│   └── rabbitmq/
│       └── NotificationConsumer.java
├── gateway-service/            # API 网关
├── frontend/                   # Vue3 前端
├── docker-***pose.yml          # 一键启动脚本
└── db/init.sql                # 数据库初始化

2. 快速开始

2.1 前置准备

  • JDK 1.8+
  • Maven 3.6+
  • Docker Desktop
  • Node.js 16+

2.2 启动基础设施

# 克隆项目
git clone [你的项目地址]
cd rabbitmq-microservice-demo

# 启动 RabbitMQ + MySQL + Redis
docker-***pose up -d

# 等待 30 秒让服务完全启动

2.3 访问 RabbitMQ 管理界面

浏览器打开:http://localhost:15672

  • 用户名:admin
  • 密码:admin123

你会看到:

  • 📊 Dashboard:系统概览
  • 🔄 Exchanges:交换机列表
  • 📮 Queues:队列列表
  • 🔗 Connections:连接状态

2.4 启动后端服务

# 方式1:使用 IDEA
# 按顺序运行:
# 1. OrderServiceApplication (8081)
# 2. InventoryServiceApplication (8082)
# 3. NotificationServiceApplication (8083)
# 4. GatewayServiceApplication (8080)

# 方式2:使用命令行
cd order-service && mvn spring-boot:run

2.5 启动前端

cd frontend
npm install
npm run dev

访问:http://localhost:5173

2.6 测试完整流程

  1. 在前端创建一个订单
  2. 观察后端控制台的日志输出
  3. 在 RabbitMQ 管理界面看消息流转
  4. 查看数据库中的数据变化

恭喜!🎉 环境搭建完成,开始学习吧!


3. RabbitMQ 核心概念

3.1 基本组件

Producer → Exchange → Queue → Consumer
  生产者    交换机     队列     消费者
Producer(生产者)

是什么?

  • 发送消息的应用程序

在我们项目中:

  • 订单服务就是生产者
  • 位置:order-service/src/main/java/***/example/order/rabbitmq/RabbitMQProducer.java

代码示例:

// 订单服务发送消息
public void createOrder(OrderCreateDTO dto) {
    // 1. 保存订单
    orderRepository.save(order);
    
    // 2. 发送库存扣减消息
    rabbitMQProducer.sendMessage(
        "inventory.exchange",
        "inventory.reduce",
        new InventoryReduceDTO(orderNo, productId, quantity)
    );
    
    // 3. 发送订单通知消息
    rabbitMQProducer.sendMessage(
        "notification.exchange",
        "notification.order",
        new OrderNotificationDTO(orderNo, userId, ...)
    );
}
Exchange(交换机)

是什么?

  • 接收生产者的消息
  • 根据路由规则将消息分发到队列

在我们项目中定义了 3 个交换机:

// 位置:order-service/.../RabbitMQConfig.java

// 1. 库存交换机(Direct 类型)
@Bean
public DirectExchange inventoryExchange() {
    return new DirectExchange("inventory.exchange", true, false);
}

// 2. 通知交换机(Topic 类型)
@Bean
public TopicExchange notificationExchange() {
    return new TopicExchange("notification.exchange", true, false);
}

// 3. 死信交换机(处理失败消息)
@Bean
public DirectExchange deadLetterExchange() {
    return new DirectExchange("dead-letter.exchange", true, false);
}
Queue(队列)

是什么?

  • 存储消息的容器
  • 消费者从队列中获取消息

在我们项目中定义了 3 个队列:

// 1. 库存队列
@Bean
public Queue inventoryQueue() {
    return QueueBuilder.durable("inventory.queue")
        .deadLetterExchange("dead-letter.exchange")  // 配置死信交换机
        .deadLetterRoutingKey("dead-letter")
        .build();
}

// 2. 通知队列
@Bean
public Queue notificationQueue() {
    return QueueBuilder.durable("notification.queue")
        .deadLetterExchange("dead-letter.exchange")
        .deadLetterRoutingKey("dead-letter")
        .build();
}

// 3. 死信队列
@Bean
public Queue deadLetterQueue() {
    return QueueBuilder.durable("dead-letter.queue").build();
}
Consumer(消费者)

是什么?

  • 接收并处理消息的应用程序

在我们项目中:

  • 库存服务消费库存队列
  • 通知服务消费通知队列

代码示例:

// 位置:inventory-service/.../InventoryConsumer.java

@RabbitListener(queues = "inventory.queue")
public void handleInventoryReduce(Message message, Channel channel) {
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    
    try {
        // 1. 解析消息
        InventoryReduceDTO dto = JSON.parseObject(
            new String(message.getBody()), 
            InventoryReduceDTO.class
        );
        
        // 2. 执行业务逻辑(扣减库存)
        inventoryService.reduceInventory(
            dto.getOrderNo(), 
            dto.getProductId(), 
            dto.getQuantity()
        );
        
        // 3. 手动确认消息
        channel.basicAck(deliveryTag, false);
        
    } catch (Exception e) {
        // 4. 处理失败,拒绝消息
        channel.basi***ack(deliveryTag, false, false);
    }
}

3.2 交换机类型详解

Direct Exchange(直连交换机)

路由规则: routing key 必须完全匹配

使用场景: 需要精确路由的场景

在我们项目中:

// 库存交换机使用 Direct 类型
@Bean
public DirectExchange inventoryExchange() {
    return new DirectExchange("inventory.exchange", true, false);
}

// 绑定:routing-key = "inventory.reduce"
@Bean
public Binding inventoryBinding() {
    return BindingBuilder
        .bind(inventoryQueue())
        .to(inventoryExchange())
        .with("inventory.reduce");  // 必须精确匹配
}

消息流转:

发送消息:routing-key = "inventory.reduce"
    ↓
inventory.exchange (Direct)
    ↓
匹配 routing-key = "inventory.reduce"
    ↓
路由到 inventory.queue
Topic Exchange(主题交换机)

路由规则: 支持通配符

  • * 匹配一个单词
  • # 匹配零个或多个单词

使用场景: 需要灵活路由的场景

在我们项目中:

// 通知交换机使用 Topic 类型
@Bean
public TopicExchange notificationExchange() {
    return new TopicExchange("notification.exchange", true, false);
}

// 绑定
@Bean
public Binding notificationBinding() {
    return BindingBuilder
        .bind(notificationQueue())
        .to(notificationExchange())
        .with("notification.order");  // 可以用通配符:notification.#
}

通配符示例:

notification.*          匹配 notification.order、notification.sms
notification.#          匹配 notification.order.create、notification.sms.send
notification.order.*    匹配 notification.order.create、notification.order.cancel
Fanout Exchange(扇形交换机)

路由规则: 广播到所有绑定的队列,忽略 routing key

使用场景: 广播通知、日志收集

代码示例:

// 创建 Fanout 交换机
@Bean
public FanoutExchange broadcastExchange() {
    return new FanoutExchange("broadcast.exchange", true, false);
}

// 绑定多个队列
@Bean
public Binding smsBinding() {
    return BindingBuilder.bind(smsQueue()).to(broadcastExchange());
}

@Bean
public Binding emailBinding() {
    return BindingBuilder.bind(emailQueue()).to(broadcastExchange());
}

// 发送消息(routing key 无效)
rabbitTemplate.convertAndSend("broadcast.exchange", "", message);
// 所有绑定的队列都会收到消息

3.3 Binding(绑定)

是什么?

  • Exchange 和 Queue 之间的关系
  • 定义路由规则

在我们项目中:

@Bean
public Binding inventoryBinding() {
    return BindingBuilder
        .bind(inventoryQueue())           // 队列
        .to(inventoryExchange())          // 交换机
        .with("inventory.reduce");        // routing key
}

绑定关系可视化:

inventory.exchange (Direct)
    ↓ (routing-key: inventory.reduce)
inventory.queue
    ↓
InventoryConsumer

notification.exchange (Topic)
    ↓ (routing-key: notification.order)
notification.queue
    ↓
NotificationConsumer

4. 项目架构详解

4.1 整体架构图

┌─────────────────────────────────────────────────────┐
│                    前端界面 (Vue3)                    │
│              http://localhost:5173                   │
└─────────────────┬───────────────────────────────────┘
                  │ HTTP
        ┌─────────▼──────────┐
        │   API 网关 (8080)   │
        └─────────┬──────────┘
                  │
     ┌────────────┼────────────┐
     │            │            │
┌────▼────┐  ┌───▼────┐  ┌───▼─────┐
│ 订单服务 │  │库存服务 │  │通知服务 │
│ (8081)  │  │ (8082) │  │ (8083)  │
│ 生产者  │  │ 消费者 │  │ 消费者  │
└────┬────┘  └───▲────┘  └───▲─────┘
     │           │            │
     │  ┌────────┴────────────┴────────┐
     └──►   RabbitMQ (Docker:5672)     │
        │  - inventory.queue            │
        │  - notification.queue         │
        │  - dead-letter.queue          │
        └───────────────────────────────┘
             │               │
    ┌────────▼────┐    ┌────▼─────┐
    │MySQL (3306) │    │Redis(6379)│
    │   (Docker)  │    │  (Docker) │
    └─────────────┘    └───────────┘

4.2 消息流转流程

完整的订单创建流程:

① 用户在前端提交订单
    ↓
② 前端 POST → API网关 → 订单服务
    ↓
③ 订单服务保存订单到 MySQL
    ↓
④ 订单服务发送消息到 RabbitMQ
    ├─→ inventory.exchange → inventory.queue
    └─→ notification.exchange → notification.queue
    ↓
⑤ 返回成功给前端(异步处理)
    ↓
⑥ 库存服务监听 inventory.queue
    ├─ 接收消息
    ├─ 扣减库存(乐观锁)
    ├─ 记录库存流水
    └─ ACK 确认
    ↓
⑦ 通知服务监听 notification.queue
    ├─ 接收消息
    ├─ 发送通知(模拟邮件/短信)
    ├─ 记录通知日志
    └─ ACK 确认
    ↓
⑧ 用户可以在前端查看订单、库存、通知状态

时序图:

用户     订单服务    RabbitMQ    库存服务    通知服务
 │          │           │           │           │
 ├─创建订单─►│           │           │           │
 │          ├─保存订单─►DB          │           │
 │          ├─发消息───►│           │           │
 │◄─返回成功─┤           │           │           │
 │          │           ├─推送────►│           │
 │          │           │           ├─扣库存───►DB
 │          │           │           ├─ACK─────►│
 │          │           ├─推送─────────────────►│
 │          │           │           │           ├─发通知─►
 │          │           │           │           ├─ACK───►│
 │          │           │           │           │

4.3 项目目录结构

订单服务(生产者):

order-service/
└── src/main/java/***/example/order/
    ├── controller/
    │   └── OrderController.java        # REST API
    ├── service/
    │   └── OrderService.java           # 业务逻辑
    ├── rabbitmq/
    │   ├── RabbitMQConfig.java         # 核心配置 ⭐
    │   └── RabbitMQProducer.java       # 消息发送 ⭐
    ├── entity/
    │   └── Order.java                  # 订单实体
    └── dto/
        ├── InventoryReduceDTO.java     # 库存消息DTO
        └── OrderNotificationDTO.java   # 通知消息DTO

库存服务(消费者):

inventory-service/
└── src/main/java/***/example/inventory/
    ├── service/
    │   └── InventoryService.java       # 库存业务
    └── rabbitmq/
        └── InventoryConsumer.java      # 消息接收 ⭐

5. 消息生产者实现

5.1 配置 RabbitMQ

位置: order-service/src/main/java/***/example/order/rabbitmq/RabbitMQConfig.java

@Configuration
public class RabbitMQConfig {

    // 从配置文件读取队列名称
    @Value("${rabbitmq.inventory.exchange}")
    private String inventoryExchange;

    @Value("${rabbitmq.inventory.queue}")
    private String inventoryQueue;

    @Value("${rabbitmq.inventory.routing-key}")
    private String inventoryRoutingKey;

    // ========== 创建交换机 ==========
    
    @Bean
    public DirectExchange inventoryExchange() {
        // 参数1:交换机名称
        // 参数2:是否持久化(重启后是否存在)
        // 参数3:是否自动删除
        return new DirectExchange(inventoryExchange, true, false);
    }

    // ========== 创建队列 ==========
    
    @Bean
    public Queue inventoryQueue() {
        return QueueBuilder.durable(inventoryQueue)
            // 配置死信交换机(消息处理失败后的去处)
            .deadLetterExchange("dead-letter.exchange")
            .deadLetterRoutingKey("dead-letter")
            .build();
    }

    // ========== 绑定交换机和队列 ==========
    
    @Bean
    public Binding inventoryBinding() {
        return BindingBuilder
            .bind(inventoryQueue())         // 队列
            .to(inventoryExchange())        // 交换机
            .with(inventoryRoutingKey);     // routing key
    }

    // ========== 配置 RabbitTemplate ==========
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        
        // 设置消息转换器为 JSON
        template.setMessageConverter(jackson2JsonMessageConverter());
        
        // 消息路由失败时是否退回
        template.setMandatory(true);
        
        return template;
    }

    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    // ========== 配置 RabbitAdmin ==========
    
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }
}

配置文件: order-service/src/main/resources/application.yml

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin123
    virtual-host: /
    # 生产者确认
    publisher-confirm-type: correlated
    # 生产者退回
    publisher-returns: true
    template:
      mandatory: true

# 自定义配置
rabbitmq:
  inventory:
    exchange: inventory.exchange
    queue: inventory.queue
    routing-key: inventory.reduce
  notification:
    exchange: notification.exchange
    queue: notification.queue
    routing-key: notification.order

5.2 实现消息发送

位置: order-service/src/main/java/***/example/order/rabbitmq/RabbitMQProducer.java

@Slf4j
@***ponent
public class RabbitMQProducer implements 
        RabbitTemplate.ConfirmCallback,    // 确认回调
        RabbitTemplate.ReturnsCallback {    // 退回回调

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private MessageLogMapper messageLogMapper;

    @PostConstruct
    public void init() {
        // 设置回调
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    /**
     * 发送消息到 RabbitMQ
     */
    public void sendMessage(String exchange, String routingKey, Object message) {
        try {
            // 1. 生成消息唯一ID
            String messageId = UUID.randomUUID().toString();
            
            // 2. 保存消息记录到数据库(用于消息追踪)
            MessageLog messageLog = new MessageLog();
            messageLog.setMessageId(messageId);
            messageLog.setContent(JSON.toJSONString(message));
            messageLog.setExchange(exchange);
            messageLog.setRoutingKey(routingKey);
            messageLog.setStatus(0); // 发送中
            messageLogMapper.insert(messageLog);

            // 3. 发送消息
            CorrelationData correlationData = new CorrelationData(messageId);
            rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);

            log.info("【RabbitMQ生产者】发送消息 => messageId: {}, exchange: {}, routingKey: {}", 
                    messageId, exchange, routingKey);

        } catch (Exception e) {
            log.error("【RabbitMQ生产者】发送消息失败", e);
            throw new RuntimeException("发送消息失败", e);
        }
    }

    /**
     * 消息确认回调
     * 确认消息是否到达 Exchange
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String messageId = correlationData.getId();
        
        if (ack) {
            // ✅ 消息成功到达交换机
            log.info("【RabbitMQ生产者】消息确认成功 => messageId: {}", messageId);
            
            // 更新消息状态为已发送
            MessageLog messageLog = messageLogMapper.selectByMessageId(messageId);
            if (messageLog != null) {
                messageLog.setStatus(1);
                messageLogMapper.updateById(messageLog);
            }
        } else {
            // ❌ 消息未到达交换机
            log.error("【RabbitMQ生产者】消息确认失败 => messageId: {}, cause: {}", 
                    messageId, cause);
            
            // 更新消息状态为发送失败
            MessageLog messageLog = messageLogMapper.selectByMessageId(messageId);
            if (messageLog != null) {
                messageLog.setStatus(2);
                messageLog.setRetryCount(messageLog.getRetryCount() + 1);
                messageLogMapper.updateById(messageLog);
            }

            // TODO: 实现重试逻辑
        }
    }

    /**
     * 消息退回回调
     * 当消息从交换机路由到队列失败时触发
     */
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.error("【RabbitMQ生产者】消息退回 => message: {}, replyCode: {}, replyText: {}, exchange: {}, routingKey: {}",
                new String(returned.getMessage().getBody()),
                returned.getReplyCode(),
                returned.getReplyText(),
                returned.getExchange(),
                returned.getRoutingKey());

        // TODO: 实现补偿逻辑
    }
}

5.3 在业务代码中使用

位置: order-service/src/main/java/***/example/order/service/OrderService.java

@Service
public class OrderService {

    @Autowired
    private RabbitMQProducer rabbitMQProducer;

    @Value("${rabbitmq.inventory.exchange}")
    private String inventoryExchange;

    @Value("${rabbitmq.inventory.routing-key}")
    private String inventoryRoutingKey;

    @Transactional
    public Order createOrder(OrderCreateDTO dto) {
        // 1. 生成订单编号
        String orderNo = "ORD" + IdUtil.getSnowflakeNextIdStr();

        // 2. 创建订单对象
        Order order = new Order();
        order.setOrderNo(orderNo);
        order.setUserId(dto.getUserId());
        order.setProductId(dto.getProductId());
        order.setQuantity(dto.getQuantity());
        // ... 设置其他字段

        // 3. 保存订单到数据库
        save(order);
        
        log.info("【订单服务】创建订单成功 => orderNo: {}", orderNo);

        // 4. 发送库存扣减消息
        InventoryReduceDTO inventoryMessage = new InventoryReduceDTO(
            orderNo, 
            dto.getProductId(), 
            dto.getQuantity()
        );
        rabbitMQProducer.sendMessage(
            inventoryExchange, 
            inventoryRoutingKey, 
            inventoryMessage
        );

        // 5. 发送订单通知消息
        OrderNotificationDTO notificationMessage = new OrderNotificationDTO(
            orderNo, 
            dto.getUserId(), 
            productName, 
            dto.getQuantity(), 
            totalAmount, 
            1  // 通知类型:订单创建
        );
        rabbitMQProducer.sendMessage(
            notificationExchange, 
            notificationRoutingKey, 
            notificationMessage
        );

        return order;
    }
}

6. 消息消费者实现

6.1 库存服务消费者

位置: inventory-service/src/main/java/***/example/inventory/rabbitmq/InventoryConsumer.java

@Slf4j
@***ponent
public class InventoryConsumer {

    @Autowired
    private InventoryService inventoryService;

    /**
     * 监听库存队列
     * 处理库存扣减消息
     */
    @RabbitListener(queues = "${rabbitmq.inventory.queue}")
    public void handleInventoryReduce(Message message, Channel channel) 
            throws IOException {
        
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        
        try {
            // 1. 获取消息内容
            String messageBody = new String(message.getBody());
            log.info("【RabbitMQ消费者】接收到库存扣减消息 => {}", messageBody);

            // 2. 解析消息
            InventoryReduceDTO dto = JSON.parseObject(
                messageBody, 
                InventoryReduceDTO.class
            );

            // 3. 处理业务逻辑 - 扣减库存
            inventoryService.reduceInventory(
                dto.getOrderNo(), 
                dto.getProductId(), 
                dto.getQuantity()
            );

            // 4. 手动确认消息(ACK)
            // 参数1:deliveryTag 消息标签
            // 参数2:multiple 是否批量确认
            channel.basicAck(deliveryTag, false);
            
            log.info("【RabbitMQ消费者】消息处理成功,已确认 => orderNo: {}", 
                    dto.getOrderNo());

        } catch (Exception e) {
            log.error("【RabbitMQ消费者】消息处理失败 => deliveryTag: {}, error: {}", 
                    deliveryTag, e.getMessage(), e);

            // 判断是否已经重试过
            Boolean redelivered = message.getMessageProperties().getRedelivered();
            
            if (redelivered) {
                // 已经重试过,拒绝消息并不重新入队(进入死信队列)
                // 参数1:deliveryTag
                // 参数2:multiple 是否批量
                // 参数3:requeue 是否重新入队
                channel.basi***ack(deliveryTag, false, false);
                
                log.error("【RabbitMQ消费者】消息已重试,拒绝并进入死信队列 => deliveryTag: {}", 
                        deliveryTag);
            } else {
                // 第一次失败,拒绝消息并重新入队进行重试
                channel.basi***ack(deliveryTag, false, true);
                
                log.warn("【RabbitMQ消费者】消息处理失败,重新入队重试 => deliveryTag: {}", 
                        deliveryTag);
            }
        }
    }

    /**
     * 监听死信队列
     * 处理多次重试失败的消息
     */
    @RabbitListener(queues = "dead-letter.queue")
    public void handleDeadLetter(Message message, Channel channel) 
            throws IOException {
        
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        
        try {
            String messageBody = new String(message.getBody());
            log.error("【RabbitMQ死信队列】接收到死信消息 => {}", messageBody);

            // 这里可以:
            // 1. 保存到数据库,由人工处理
            // 2. 发送告警通知
            // 3. 记录到日志文件
            
            // 确认消息
            channel.basicAck(deliveryTag, false);
            
        } catch (Exception e) {
            log.error("【RabbitMQ死信队列】处理失败", e);
            channel.basi***ack(deliveryTag, false, false);
        }
    }
}

6.2 通知服务消费者

位置: notification-service/src/main/java/***/example/notification/rabbitmq/NotificationConsumer.java

@Slf4j
@***ponent
public class NotificationConsumer {

    @Autowired
    private NotificationService notificationService;

    /**
     * 监听通知队列
     */
    @RabbitListener(queues = "${rabbitmq.notification.queue}")
    public void handleOrderNotification(Message message, Channel channel) 
            throws IOException {
        
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        
        try {
            // 1. 获取消息内容
            String messageBody = new String(message.getBody());
            log.info("【RabbitMQ消费者】接收到订单通知消息 => {}", messageBody);

            // 2. 解析消息
            OrderNotificationDTO dto = JSON.parseObject(
                messageBody, 
                OrderNotificationDTO.class
            );

            // 3. 构建通知内容
            String content = String.format(
                "尊敬的用户,您的订单 %s 已创建成功!\n" +
                "商品:%s\n" +
                "数量:%d\n" +
                "金额:¥%.2f\n" +
                "感谢您的购买!",
                dto.getOrderNo(),
                dto.getProductName(),
                dto.getQuantity(),
                dto.getTotalAmount()
            );

            // 4. 发送通知
            notificationService.sendNotification(
                dto.getOrderNo(),
                dto.getUserId(),
                content,
                dto.getType()
            );

            // 5. 手动确认消息
            channel.basicAck(deliveryTag, false);
            
            log.info("【RabbitMQ消费者】通知发送成功 => orderNo: {}", 
                    dto.getOrderNo());

        } catch (Exception e) {
            log.error("【RabbitMQ消费者】处理通知消息失败 => deliveryTag: {}", 
                    deliveryTag, e);

            // 通知发送失败通常不需要重试,直接拒绝
            channel.basi***ack(deliveryTag, false, false);
        }
    }
}

6.3 消费者配置

位置: inventory-service/src/main/resources/application.yml

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin123
    virtual-host: /
    listener:
      simple:
        # 手动确认消息
        acknowledge-mode: manual
        # 消费者最小数量
        concurrency: 1
        # 消费者最大数量
        max-concurrency: 10
        # 限流:每次只处理一条消息
        prefetch: 1
        # 是否支持重试
        retry:
          enabled: true
          # 最大重试次数
          max-attempts: 3
          # 重试间隔(毫秒)
          initial-interval: 3000

rabbitmq:
  inventory:
    queue: inventory.queue

7. 消息可靠性保证

7.1 生产者确认(Publisher Confirm)

问题: 如何确保消息成功发送到 RabbitMQ?

解决方案: 开启生产者确认机制

配置
spring:
  rabbitmq:
    # 开启生产者确认
    publisher-confirm-type: correlated  # SIMPLE | CORRELATED | NONE
    # 开启生产者退回
    publisher-returns: true
    template:
      # 消息路由失败时是否退回
      mandatory: true
实现确认回调
@***ponent
public class RabbitMQProducer implements RabbitTemplate.ConfirmCallback {

    /**
     * 消息是否成功到达 Exchange
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String messageId = correlationData.getId();
        
        if (ack) {
            // ✅ 成功到达交换机
            log.info("消息发送成功 => messageId: {}", messageId);
            // 更新数据库:消息状态 = 已发送
        } else {
            // ❌ 未到达交换机
            log.error("消息发送失败 => messageId: {}, cause: {}", messageId, cause);
            // 更新数据库:消息状态 = 发送失败
            // 实现重试逻辑
        }
    }
}
实现退回回调
@***ponent
public class RabbitMQProducer implements RabbitTemplate.ReturnsCallback {

    /**
     * 消息从 Exchange 路由到 Queue 失败时触发
     */
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.error("消息退回 => message: {}, replyCode: {}, replyText: {}, exchange: {}, routingKey: {}",
                new String(returned.getMessage().getBody()),
                returned.getReplyCode(),
                returned.getReplyText(),
                returned.getExchange(),
                returned.getRoutingKey());
        
        // 实现补偿逻辑:
        // 1. 记录到数据库
        // 2. 发送告警
        // 3. 人工处理
    }
}

消息发送流程图:

生产者发送消息
    ↓
消息到达 Exchange?
    ├─ YES → ConfirmCallback(ack=true)
    └─ NO  → ConfirmCallback(ack=false)
    ↓
消息路由到 Queue?
    ├─ YES → 正常消费
    └─ NO  → ReturnsCallback(退回)

7.2 消息持久化

问题: RabbitMQ 宕机后,消息会丢失吗?

解决方案: 持久化 Exchange、Queue 和 Message

Exchange 持久化
@Bean
public DirectExchange inventoryExchange() {
    // 参数2:durable = true,持久化
    return new DirectExchange("inventory.exchange", true, false);
}
Queue 持久化
@Bean
public Queue inventoryQueue() {
    // durable() 方法创建持久化队列
    return QueueBuilder.durable("inventory.queue").build();
}
Message 持久化
// Spring AMQP 默认会将消息设置为持久化
// 消息的 deliveryMode = 2 (PERSISTENT)
rabbitTemplate.convertAndSend(exchange, routingKey, message);

// 如需手动设置:
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
    msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
    return msg;
});

持久化效果:

持久化级别 RabbitMQ 重启后
Exchange 持久化 Exchange 保留
Queue 持久化 Queue 保留
Message 持久化 消息保留
都不持久化 全部丢失 ❌

7.3 消费者确认(ACK)

问题: 如何确保消息被成功消费?

解决方案: 手动确认消息

配置
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual  # 手动确认
ACK 模式对比
模式 说明 优点 缺点 使用场景
auto 消息一旦被接收就自动确认 简单 可能丢消息 允许丢失的场景
manual 需要手动调用 ACK 可靠 代码复杂 生产环境 ✅
none 不确认 - 不推荐 不推荐使用
手动确认代码
@RabbitListener(queues = "inventory.queue")
public void handleMessage(Message message, Channel channel) throws IOException {
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    
    try {
        // 处理业务逻辑
        processMessage(message);
        
        // ✅ 确认消息(ACK)
        // 参数1:deliveryTag 消息标签
        // 参数2:multiple 是否批量确认
        channel.basicAck(deliveryTag, false);
        
    } catch (BusinessException e) {
        // ❌ 业务异常,拒绝消息并不重新入队
        // 参数1:deliveryTag
        // 参数2:multiple 是否批量
        // 参数3:requeue 是否重新入队
        channel.basi***ack(deliveryTag, false, false);
        
    } catch (Exception e) {
        // ⚠️ 系统异常,拒绝消息并重新入队重试
        channel.basi***ack(deliveryTag, false, true);
    }
}
三种确认方式
// 1. basicAck - 确认消息
channel.basicAck(deliveryTag, false);

// 2. basi***ack - 拒绝消息(可批量,可重新入队)
channel.basi***ack(deliveryTag, false, true);

// 3. basicReject - 拒绝消息(不可批量)
channel.basicReject(deliveryTag, false);

消费流程图:

消费者接收消息
    ↓
处理业务逻辑
    ├─ 成功 → channel.basicAck() → 消息从队列删除
    │
    ├─ 失败(首次)→ channel.basi***ack(requeue=true) → 重新入队
    │
    └─ 失败(重试后)→ channel.basi***ack(requeue=false) → 死信队列

8. 死信队列实战

8.1 什么是死信队列?

死信(Dead Letter): 无法被正常消费的消息

产生死信的情况:

  1. 消息被拒绝(basi***ack/basicReject)且 requeue=false
  2. 消息过期(TTL 超时)
  3. 队列达到最大长度

死信队列的作用:

  • 保存处理失败的消息
  • 防止消息丢失
  • 方便人工介入处理

8.2 配置死信队列

创建死信交换机和队列
// 位置:order-service/.../RabbitMQConfig.java

// 1. 创建死信交换机
@Bean
public DirectExchange deadLetterExchange() {
    return new DirectExchange("dead-letter.exchange", true, false);
}

// 2. 创建死信队列
@Bean
public Queue deadLetterQueue() {
    return QueueBuilder.durable("dead-letter.queue").build();
}

// 3. 绑定死信队列到死信交换机
@Bean
public Binding deadLetterBinding() {
    return BindingBuilder
        .bind(deadLetterQueue())
        .to(deadLetterExchange())
        .with("dead-letter");  // routing key
}
业务队列配置死信交换机
@Bean
public Queue inventoryQueue() {
    return QueueBuilder.durable("inventory.queue")
        // 配置死信交换机
        .deadLetterExchange("dead-letter.exchange")
        // 配置死信路由键
        .deadLetterRoutingKey("dead-letter")
        // 可选:设置消息 TTL(30秒)
        .ttl(30000)
        // 可选:设置队列最大长度
        .maxLength(10000)
        .build();
}

8.3 死信流转过程

普通消息流程:
Producer → Exchange → Queue → Consumer (成功)

死信流程:
Producer → Exchange → Queue → Consumer (失败)
                                   ↓
                            basi***ack(requeue=false)
                                   ↓
                           Dead Letter Exchange
                                   ↓
                            Dead Letter Queue
                                   ↓
                           Dead Letter Consumer

8.4 监听死信队列

// 位置:inventory-service/.../InventoryConsumer.java

@RabbitListener(queues = "dead-letter.queue")
public void handleDeadLetter(Message message, Channel channel) throws IOException {
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    
    try {
        String messageBody = new String(message.getBody());
        log.error("【死信队列】接收到死信消息 => {}", messageBody);

        // 获取死信原因
        Map<String, Object> headers = message.getMessageProperties().getHeaders();
        String reason = (String) headers.get("x-first-death-reason");
        String queue = (String) headers.get("x-first-death-queue");
        
        log.error("死信原因:{}, 原队列:{}", reason, queue);

        // 处理死信消息:
        // 1. 保存到数据库,标记为失败
        saveToDeadLetterTable(messageBody, reason);
        
        // 2. 发送告警通知
        sendAlert("检测到死信消息", messageBody);
        
        // 3. 记录到日志文件
        logToFile(messageBody);

        // 确认消息
        channel.basicAck(deliveryTag, false);
        
    } catch (Exception e) {
        log.error("【死信队列】处理失败", e);
        // 死信队列的消息如果再次失败,直接拒绝(不再重试)
        channel.basi***ack(deliveryTag, false, false);
    }
}

8.5 在 RabbitMQ 管理界面查看

步骤:

  1. 访问 http://localhost:15672
  2. 登录(admin/admin123)
  3. 点击 “Queues” 标签
  4. 查看 dead-letter.queue
  5. 可以看到死信消息的数量和详情

9. 消息幂等性处理

9.1 什么是消息幂等性?

幂等性: 同一个消息被消费多次,结果应该是一样的

为什么会重复消费?

  1. 生产者重试导致消息重复发送
  2. 消费者处理完成但 ACK 失败,导致消息重新入队
  3. 网络抖动导致消息重发

不做幂等性处理的后果:

// ❌ 不幂等:库存重复扣减
订单A1次消费:库存 10099
订单A2次消费(重复):库存 9998  // 库存被重复扣减!
```

9.2 幂等性实现方案

方案1:唯一消息ID + Redis

原理: 使用 Redis 记录已处理的消息ID

实现步骤:

// 位置:inventory-service/.../InventoryConsumer.java

@Autowired
private StringRedisTemplate redisTemplate;

@RabbitListener(queues = "${rabbitmq.inventory.queue}")
public void handleInventoryReduce(Message message, Channel channel) throws IOException {
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    
    try {
        // 1. 获取消息ID(由生产者生成)
        String messageId = message.getMessageProperties().getMessageId();
        
        // 2. 检查消息是否已处理(Redis)
        String key = "msg:consumed:" + messageId;
        Boolean isProcessed = redisTemplate.opsForValue().setIfAbsent(
            key, 
            "1", 
            24, 
            TimeUnit.HOURS
        );
        
        if (Boolean.FALSE.equals(isProcessed)) {
            // 消息已处理,直接确认
            log.warn("【幂等性】消息已处理,跳过 => messageId: {}", messageId);
            channel.basicAck(deliveryTag, false);
            return;
        }
        
        // 3. 解析消息
        InventoryReduceDTO dto = JSON.parseObject(
            new String(message.getBody()), 
            InventoryReduceDTO.class
        );
        
        // 4. 执行业务逻辑
        inventoryService.reduceInventory(
            dto.getOrderNo(), 
            dto.getProductId(), 
            dto.getQuantity()
        );
        
        // 5. 确认消息
        channel.basicAck(deliveryTag, false);
        
        log.info("【幂等性】消息处理成功 => messageId: {}", messageId);
        
    } catch (Exception e) {
        // 处理失败,删除 Redis 标记,允许重试
        String key = "msg:consumed:" + message.getMessageProperties().getMessageId();
        redisTemplate.delete(key);
        
        // 拒绝消息
        channel.basi***ack(deliveryTag, false, true);
    }
}

流程图:

接收消息
    ↓
检查 Redis 中是否存在 messageId
    ├─ 存在 → 消息已处理 → 直接ACK
    └─ 不存在 → 写入 Redis → 执行业务 → ACK
方案2:数据库唯一约束

原理: 利用数据库的唯一索引保证幂等性

数据库设计:

-- 库存流水表
CREATE TABLE t_inventory_log (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    order_no VARCHAR(50) NOT NULL UNIQUE,  -- 订单号唯一索引 ⭐
    product_id BIGINT NOT NULL,
    quantity INT NOT NULL,
    create_time DATETIME DEFAULT CURRENT_TIMESTAMP
);

业务代码:

@Transactional
public void reduceInventory(String orderNo, Long productId, Integer quantity) {
    // 1. 检查是否已处理
    InventoryLog existLog = inventoryLogMapper.selectByOrderNo(orderNo);
    if (existLog != null) {
        log.warn("【幂等性】订单已处理 => orderNo: {}", orderNo);
        return;
    }
    
    // 2. 扣减库存(乐观锁)
    int updated = inventoryMapper.reduceStock(productId, quantity);
    if (updated == 0) {
        throw new BusinessException("库存不足");
    }
    
    // 3. 记录流水(唯一约束保证幂等)
    InventoryLog log = new InventoryLog();
    log.setOrderNo(orderNo);  // 唯一索引,重复插入会失败
    log.setProductId(productId);
    log.setQuantity(quantity);
    
    try {
        inventoryLogMapper.insert(log);
    } catch (DuplicateKeyException e) {
        // 重复订单,说明已处理
        log.warn("【幂等性】检测到重复订单 => orderNo: {}", orderNo);
        throw new BusinessException("订单已处理");
    }
}
方案3:业务逻辑天然幂等

适用场景: 某些业务操作本身就是幂等的

示例:

// ✅ 天然幂等:更新订单状态
// 多次执行结果一样
UPDATE t_order SET status = 2 WHERE order_no = 'ORD123456';

// ✅ 天然幂等:查询操作
SELECT * FROM t_order WHERE order_no = 'ORD123456';

// ❌ 非幂等:扣减库存(每次执行结果不同)
UPDATE t_inventory SET stock = stock - 1 WHERE product_id = 1001;

9.3 我们项目中的幂等性实现

库存服务: Redis + 数据库唯一约束

// 1. Redis 快速判重
String key = "order:processed:" + orderNo;
if (!redisTemplate.opsForValue().setIfAbsent(key, "1", 24, TimeUnit.HOURS)) {
    return;  // 已处理
}

// 2. 数据库唯一索引兜底
try {
    inventoryLogMapper.insert(log);  // order_no 唯一索引
} catch (DuplicateKeyException e) {
    // 重复订单
}

通知服务: 允许重复发送(业务上可接受)

// 通知类消息通常允许重复
// 发送多次邮件/短信对业务影响不大
// 可以不做幂等性处理

10. 性能优化技巧

10.1 消费者并发配置

问题: 单个消费者处理速度慢,消息堆积

解决方案: 增加消费者并发数

spring:
  rabbitmq:
    listener:
      simple:
        # 最小消费者数量
        concurrency: 3
        # 最大消费者数量
        max-concurrency: 10
        # 每个消费者一次拉取的消息数
        prefetch: 1

配置说明:

参数 说明 推荐值
concurrency 初始消费者线程数 CPU核心数
max-concurrency 最大消费者线程数 CPU核心数 * 2
prefetch 每次拉取消息数(限流) 1-10

效果对比:

单消费者:
Queue [1000条消息] → Consumer (1个) → 处理时间: 100秒

并发消费者:
Queue [1000条消息] → Consumer (10个) → 处理时间: 10秒

10.2 批量确认消息

问题: 每条消息单独 ACK,网络开销大

解决方案: 批量确认

@RabbitListener(queues = "inventory.queue")
public void handleBatch(List<Message> messages, Channel channel) throws IOException {
    try {
        // 批量处理消息
        for (Message message : messages) {
            processMessage(message);
        }
        
        // 批量确认(multiple = true)
        long deliveryTag = messages.get(messages.size() - 1)
            .getMessageProperties()
            .getDeliveryTag();
        
        channel.basicAck(deliveryTag, true);  // true = 批量确认
        
    } catch (Exception e) {
        // 批量拒绝
        channel.basi***ack(deliveryTag, true, true);
    }
}

注意事项:

  • ⚠️ 批量确认时,如果有一条失败,可能导致所有消息重新消费
  • ✅ 适合消息处理速度快、失败率低的场景

10.3 消息预取(Prefetch)

配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 10  # 每个消费者预取10条消息

原理:

prefetch = 1(默认):
Consumer → 拉取1条 → 处理 → ACK → 拉取1条 → 处理 → ACK
         ↑_______________|              ↑_______________|
         网络往返                       网络往返

prefetch = 10:
Consumer → 拉取10条 → 处理10条 → ACK → 拉取10条
         ↑_____________________________|
         减少网络往返次数

建议值:

  • 消息处理速度快:prefetch = 10-100
  • 消息处理速度慢:prefetch = 1-5
  • 需要严格顺序:prefetch = 1

10.4 连接池优化

配置:

spring:
  rabbitmq:
    cache:
      channel:
        # Channel 缓存大小
        size: 50
        # 最大 Channel 数
        checkout-timeout: 30000

10.5 使用延迟队列优化

场景: 订单15分钟未支付自动取消

传统方式(轮询):

// ❌ 每分钟查询数据库,效率低
@Scheduled(cron = "0 * * * * ?")
public void checkUnpaidOrders() {
    List<Order> orders = orderMapper.selectUnpaid();
    for (Order order : orders) {
        if (order.isTimeout()) {
            cancelOrder(order);
        }
    }
}

RabbitMQ 延迟队列:

// ✅ 发送延迟消息,15分钟后自动消费
@Bean
public Queue orderTimeoutQueue() {
    return QueueBuilder.durable("order.timeout.queue")
        .ttl(900000)  // 15分钟
        .deadLetterExchange("order.cancel.exchange")
        .deadLetterRoutingKey("order.cancel")
        .build();
}

// 创建订单时发送延迟消息
rabbitTemplate.convertAndSend("order.timeout.queue", orderNo);

// 15分钟后自动消费
@RabbitListener(queues = "order.cancel.queue")
public void handleOrderTimeout(String orderNo) {
    // 检查订单状态,未支付则取消
    cancelOrderIfUnpaid(orderNo);
}

10.6 消息压缩

场景: 消息体很大(如订单详情包含大量商品信息)

// 发送前压缩
public void send***pressedMessage(String exchange, String routingKey, Object message) {
    String json = JSON.toJSONString(message);
    byte[] ***pressed = GzipUtil.***press(json);
    
    rabbitTemplate.convertAndSend(exchange, routingKey, ***pressed, msg -> {
        msg.getMessageProperties().setContentEncoding("gzip");
        return msg;
    });
}

// 接收后解压
@RabbitListener(queues = "large.message.queue")
public void handle***pressed(Message message) {
    String encoding = message.getMessageProperties().getContentEncoding();
    
    byte[] body = message.getBody();
    if ("gzip".equals(encoding)) {
        body = GzipUtil.de***press(body);
    }
    
    String json = new String(body);
    // 处理消息...
}

11. 监控和调试

11.1 RabbitMQ 管理界面

访问地址: http://localhost:15672

主要功能:

Overview(概览)
  • 📊 消息速率(发送/接收)
  • 🔗 连接数、通道数
  • 📮 队列数、交换机数
Connections(连接)
  • 查看所有客户端连接
  • 查看连接的IP、端口、用户
  • 可手动关闭连接
Channels(通道)
  • 每个连接的通道列表
  • 查看通道状态(idle/running)
  • 消息确认数、未确认数
Exchanges(交换机)
  • 所有交换机列表
  • 查看交换机类型、绑定关系
  • 可手动发送测试消息
Queues(队列)⭐ 重点
  • 队列消息数量(Ready、Unacked、Total)
  • 消息速率(in***ing、deliver)
  • 消费者数量
  • 可查看队列中的消息内容

实用操作:

1. 查看队列消息:

Queues → 点击队列名 → Get messages → Get Message(s)

2. 手动发送消息:

Exchanges → 点击交换机 → Publish message → 输入内容 → Publish message

3. 清空队列:

Queues → 点击队列名 → Purge Messages

4. 删除队列:

Queues → 点击队列名 → Delete

11.2 应用日志监控

在我们项目中添加详细日志:

@Slf4j
@***ponent
public class RabbitMQProducer {
    
    public void sendMessage(String exchange, String routingKey, Object message) {
        String messageId = UUID.randomUUID().toString();
        
        log.info("【发送消息】messageId: {}, exchange: {}, routingKey: {}, content: {}", 
                messageId, exchange, routingKey, JSON.toJSONString(message));
        
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("【消息确认】成功 => messageId: {}", correlationData.getId());
        } else {
            log.error("【消息确认】失败 => messageId: {}, cause: {}", 
                    correlationData.getId(), cause);
        }
    }
}
@Slf4j
@***ponent
public class InventoryConsumer {
    
    @RabbitListener(queues = "inventory.queue")
    public void handleMessage(Message message, Channel channel) {
        String messageId = message.getMessageProperties().getMessageId();
        
        log.info("【接收消息】messageId: {}, content: {}", 
                messageId, new String(message.getBody()));
        
        try {
            // 处理消息
            processMessage(message);
            
            channel.basicAck(deliveryTag, false);
            log.info("【消息处理】成功 => messageId: {}", messageId);
            
        } catch (Exception e) {
            log.error("【消息处理】失败 => messageId: {}, error: {}", 
                    messageId, e.getMessage(), e);
            channel.basi***ack(deliveryTag, false, true);
        }
    }
}

11.3 监控指标

关键指标:

指标 说明 正常值 告警阈值
Queue Ready 队列中待消费消息 < 100 > 1000
Queue Unacked 未确认消息数 < 10 > 100
Consumer Count 消费者数量 > 0 = 0(无消费者)
Publish Rate 消息发送速率 稳定 突然激增
Deliver Rate 消息消费速率 稳定 为0(消费停滞)
Memory Used 内存使用 < 80% > 90%

11.4 常用调试命令

Docker 环境查看 RabbitMQ 日志:

# 查看 RabbitMQ 容器日志
docker logs rabbitmq-demo

# 实时查看日志
docker logs -f rabbitmq-demo

# 查看最后100行
docker logs --tail 100 rabbitmq-demo

进入 RabbitMQ 容器:

# 进入容器
docker exec -it rabbitmq-demo bash

# 查看队列列表
rabbitmqctl list_queues

# 查看交换机列表
rabbitmqctl list_exchanges

# 查看绑定关系
rabbitmqctl list_bindings

# 查看连接
rabbitmqctl list_connections

# 查看消费者
rabbitmqctl list_consumers

12. 常见问题

12.1 消息丢失

问题描述: 发送的消息没有被消费

排查步骤:

1. 检查消息是否发送成功

// 查看生产者日志
2025-10-22 10:00:00 INFO  【RabbitMQ生产者】发送消息 => messageId: xxx
2025-10-22 10:00:00 INFO  【RabbitMQ生产者】消息确认成功 => messageId: xxx

2. 检查 RabbitMQ 管理界面

  • Queues → 查看队列消息数量
  • 如果 Ready = 0,说明消息已被消费或未到达队列

3. 检查路由配置

// 检查 exchange、routing-key 是否正确
rabbitTemplate.convertAndSend(
    "inventory.exchange",      // ✅ 正确的交换机名
    "inventory.reduce",        // ✅ 正确的 routing key
    message
);

4. 检查消费者

# 查看消费者数量
rabbitmqctl list_consumers

# 如果消费者数量 = 0,说明消费者没有启动

5. 检查消费者日志

// 是否有接收消息的日志
2025-10-22 10:00:01 INFO  【RabbitMQ消费者】接收到消息 => xxx

12.2 消息堆积

问题描述: 队列中消息越来越多,消费不过来

原因分析:

  1. 消费者处理速度慢
  2. 消费者数量不足
  3. 消费者宕机

解决方案:

1. 增加消费者并发数

spring:
  rabbitmq:
    listener:
      simple:
        concurrency: 10  # 增加到10个消费者
        max-concurrency: 20

2. 优化业务代码

// ❌ 慢:每次查询数据库
for (Item item : items) {
    Product product = productMapper.selectById(item.getProductId());
}

// ✅ 快:批量查询
List<Long> productIds = items.stream()
    .map(Item::getProductId)
    .collect(Collectors.toList());
List<Product> products = productMapper.selectByIds(productIds);

3. 水平扩展(增加服务实例)

# 启动多个消费者实例
java -jar inventory-service.jar --server.port=8082
java -jar inventory-service.jar --server.port=8092
java -jar inventory-service.jar --server.port=8102

12.3 重复消费

问题描述: 同一条消息被消费多次

原因:

  • 消费者处理完成但 ACK 失败
  • 网络抖动导致超时重发

解决方案: 实现幂等性(见第9章)

12.4 消息顺序问题

问题描述: 消息乱序消费

原因:

  • 多个消费者并发消费
  • 消息重试导致顺序混乱

解决方案:

1. 单消费者 + prefetch=1

spring:
  rabbitmq:
    listener:
      simple:
        concurrency: 1  # 只有1个消费者
        prefetch: 1     # 每次只拉取1条

2. 使用消息优先级

@Bean
public Queue priorityQueue() {
    return QueueBuilder.durable("order.queue")
        .maxPriority(10)  // 设置最大优先级
        .build();
}

// 发送消息时设置优先级
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
    msg.getMessageProperties().setPriority(5);  // 优先级 0-10
    return msg;
});

3. 业务层保证顺序

// 使用版本号
UPDATE t_order SET status = 2, version = version + 1 
WHERE order_no = 'ORD123' AND version = 1;

12.5 连接失败

问题描述:

Caused by: java.***.ConnectException: Connection refused

排查步骤:

1. 检查 RabbitMQ 是否启动

docker ps | grep rabbitmq

# 如果没有,启动容器
docker-***pose up -d

2. 检查端口是否开放

# Windows
***stat -ano | findstr 5672

# Linux
***stat -tunlp | grep 5672

3. 检查配置

spring:
  rabbitmq:
    host: localhost  # 确保正确
    port: 5672       # 确保正确
    username: admin
    password: admin123

4. 防火墙问题

# 关闭防火墙(仅测试环境)
# Windows: 控制面板 → 防火墙 → 关闭
# Linux: systemctl stop firewalld

12.6 队列未创建

问题描述:

Caused by: ShutdownSignalException: channel error; 
protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND)

原因: 队列不存在

解决方案:

1. 确保 RabbitAdmin Bean 存在

@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
    return new RabbitAdmin(connectionFactory);
}

2. 先启动生产者(声明队列),再启动消费者

启动顺序:
1. order-service (生产者,声明队列)
2. inventory-service (消费者)
3. notification-service (消费者)

3. 手动创建队列

  • 访问 http://localhost:15672
  • Queues → Add a new queue
  • 输入队列名、配置参数
  • Add queue

13. 总结与展望

13.1 学习总结

通过这个项目,我们学习了:

✅ RabbitMQ 核心概念

  • Exchange、Queue、Binding
  • 四种交换机类型(Direct、Topic、Fanout、Headers)
  • 生产者、消费者模型

✅ Spring Boot 集成 RabbitMQ

  • 配置 RabbitMQ 连接
  • 声明 Exchange、Queue、Binding
  • 使用 RabbitTemplate 发送消息
  • 使用 @RabbitListener 接收消息

✅ 消息可靠性保证

  • 生产者确认(Publisher Confirm)
  • 生产者退回(Publisher Return)
  • 消息持久化(Exchange、Queue、Message)
  • 消费者手动确认(ACK/NACK)

✅ 高级特性

  • 死信队列(处理失败消息)
  • 消息幂等性(防止重复消费)
  • 延迟队列(定时任务)
  • 消息优先级

✅ 性能优化

  • 消费者并发配置
  • 批量确认消息
  • 消息预取(Prefetch)
  • 连接池优化

✅ 监控调试

  • RabbitMQ 管理界面
  • 应用日志监控
  • 常见问题排查

13.2 项目亮点

🎯 完整的业务场景

  • 订单、库存、通知三大核心服务
  • 真实的电商业务流程
  • 前后端完整实现

🐳 一键启动环境

  • Docker ***pose 管理基础设施
  • 无需手动安装 RabbitMQ、MySQL、Redis
  • 数据库自动初始化

📦 生产级代码

  • 消息确认、重试、死信队列
  • 幂等性处理、事务保证
  • 详细的日志和异常处理

📚 详细的文档

  • 启动指南
  • Docker 配置详解
  • 学习笔记(本文档)
转载请说明出处内容投诉
CSS教程网 » RabbitMQ 实战教程:从零构建微服务电商订单系统

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买