rabbitmq---延迟消息
延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间后才收到消息。
延迟任务:设置在一定时间之后才执行的任务。
延迟消息有以下三种实现方案:
- 死信交换机
- 延迟消息插件
一、延迟队列
TTL
- TTL 全称 Time To Live(存活时间/过期时间)。
- 当消息到达存活时间后,还没有被消费,会被自动清除。
- RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
死信交换机
成为死信(dead letter)的条件:
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false(消费者拒接消费消息,并且不重回队列;)
- 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
- 队列消息堆积已满,最早的消息可能成为死信
如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。
1、声明延迟队列
package ***.itheima.consumer.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.security.auth.login.CredentialNotFoundException;
@Configuration
public class DlxExchangeConfiguration {
/**
* 声明 TTL 队列
* 1. 指定消息的 TTL
* 2. 指定死信交换机
* 3. 指定死信交换机的 RoutingKey
*/
@Bean
public Queue ttlQueue() {
return QueueBuilder
.durable("ttl.queue") // 指定队列的名称
//.ttl(10000) // 指定 TTL 为 10 秒,这里可设置过期时间,但我这里测试在发送消息时设置过期时间
.deadLetterExchange("dlx.direct") // 指定死信交换机
.deadLetterRoutingKey("dlx") // 指定死信交换机的 RoutingKey
.build();
}
/**
* 声明TTl交换机
* @return
*/
@Bean
public DirectExchange directExchange(){
return new DirectExchange("ttl.direct");
}
/**
* 声明ttl交换机与队列的关联关系
* @return
*/
@Bean
public Binding directBinding(){
return BindingBuilder.bind(ttlQueue()).to(directExchange()).with("ttl");
}
/**
* 声明死信交换机
* @return
*/
@Bean
public DirectExchange dlxDirect(){
return new DirectExchange("dlx.direct");
}
/**
* 声明死信队列
* @return
*/
@Bean
public Queue dlxQueue(){
return new Queue("dlx.queue");
}
/**
* 声明死信交换机与队列关联关系
* @return
*/
@Bean
public Binding tlxBinding(){
return BindingBuilder.bind(dlxQueue()).to(dlxDirect()).with("dlx");
}
}
2、发送消息
@Test
void testSendTTLMessage(){
rabbitTemplate.convertAndSend("ttl.direct", "ttl", "hello", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000");
return message;
}
});
log.info("消息发送成功!");
}
3、死信队列消费消息
@RabbitListener(queues = "dlx.queue") //监听的队列:dlx.queue
public void listenDlxQueue(String msg){
log.info("dlx.queue的消息:【"+msg+"】");
}
4、结果比对
4.1、发送时间,设置10s过期
4.2、死信队列消费消息时间
二、延迟消息插件
RabbitMQ官方推出的插件,原生支持延迟消息的功能。其原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。
本地RabbitMQ官网下载rabbitmq_delayer_message_exchange插件地址:
https://www.rabbitmq.***/***munity-plugins.html
代码实现:
声明延迟交换机方式一:
@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct")
.delayed() //设置delay的属性为true
.durable(true)
.build();
}
方式二:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(value = "delay.direct",delayed = "true"), //delayed 这是延迟交换机开启
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}",msg);
}
测试发送消息
@Test
void testSendDelayMessage(){
rabbitTemplate.convertAndSend("delay.direct", "delay", "hello", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(1000);
return message;
}
});
log.info("消息发送成功!");
}
案例
电商项目设置30分钟后检测订单支付状态,并完成取消超时订单
设置30分钟检查订单状态存在两个问题:
- 如果并发较高,30分钟可能堆积消息过多,对MQ压力过大
- 大多数订单再下单后很短时间内就会支付,但是却需要在MQ内等待30分钟,浪费资源。
解决措施:设置多个延迟消息交换机,如设置不同的等待时间:10s、10s、10s、15s、15s....,这些时间相加得到30分钟,不同延迟时间过滤掉大部分的消息,给MQ减压
首先先查询支付状态,判断是否支付,如果状态显示未支付,则获取下次延迟时间,判断是否有延迟时间,有则重发延迟消息,没有延迟消息则取消订单。如果订单显示已支付,则标记未已支付。
定义延迟时间集合及相关方法:
package ***.itheima.consumer.config;
import lombok.Data;
import java.util.Arrays;
import java.util.List;
@Data
public class MultiDelayMessage<T>{
//消息体
private T data;
//记录延迟消息时间的集合
private List<Long> delayMillis;
public MultiDelayMessage(T data, List<Long> delayMillis){
this.data = data;
this.delayMillis = delayMillis;
}
public static <T> MultiDelayMessage<T> of(T data, Long ... delayMillis){
return new MultiDelayMessage<>(data, Arrays.asList(delayMillis));
}
//获取并移除下一个延迟时间
//Returns: 队列中的第一个延迟时间
public Long removeNextDelay(){
return delayMillis.remove(0);
}
//是否还有下一个延迟时间
public boolean hasNextDelay(){
return !delayMillis.isEmpty();
}
}
定义交换机、队列名称以及key:
public interface MqConstants {
String DELAY_EXCHANGE = "trade.delay.topic"; //交换机名称
String DELAY_ORDER_QUEUE = "trade.order.delay.queue"; //队列名称
String DELAY_PRDER_ROUTING_KEY = "order.query"; //key
}
定义延迟消息体:
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
@RequiredArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {
private final int delay;
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(delay);
return null;
}
}
实现发送延迟消息:
/*
* 生成订单后,发送延迟检查订单消息
*/
public void testOrderStatic(){
try{
MultiDelayMessage<String> msg = MultiDelayMessage.of("这里是订单ID", 1000L, 1000L, 1000L, 1500L, 1500L); //这里的消息体是订单ID,后面是延迟消息时间集合
rabbitTemplate.convertAndSend(
MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_PRDER_ROUTING_KEY,msg,
new DelayMessageProcessor(msg.removeNextDelay().intValue())
);
}catch (AmqpException e){
log.error("延迟消息发送失败!");
}
}
消费延迟消息的大概思路:
import ***.itheima.constants.DelayMessageProcessor;
import ***.itheima.constants.MqConstants;
import ***.itheima.constants.MultiDelayMessage;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@RequiredArgsConstructor
public class OrderStatusCheckListener {
private RabbitTemplate rabbitTemplate;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = MqConstants.DELAY_ORDER_QUEUE, durable = "true"),
exchange = @Exchange(value = MqConstants.DELAY_EXCHANGE, delayed = "true" ,type = ExchangeTypes.TOPIC),
key = MqConstants.DELAY_PRDER_ROUTING_KEY
))
public void listenOderDelayMessage(MultiDelayMessage<Long> msg){
//1、查询订单状态
//2、判断是否已支付
//2.1订单不存在或者已经处理(订单显示已支付)----交易服务显示已支付,则表示已支付,直接return
//(交易服务显示未支付的情况下)3、去支付服务查询真正的支付状态 ---- 这里是在交易服务查询显示未支付,但不一定是未支付,需要去支付服务查询确定一下
//3.1、判断支付服务的订单支付状态,已支付则标记订单状态为已支付,直接return
//4、判断是否存在延迟时间
if (msg.hasNextDelay()){
//4.1、存在,则重发延迟消息
Long nestDelay = msg.removeNextDelay();
rabbitTemplate.convertAndSend(
MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_PRDER_ROUTING_KEY,
msg, new DelayMessageProcessor(nestDelay.intValue())
);
return;
}
//5、不存在,取消订单,修改订单状态为取消订单
//6、恢复库存
}
}