目录
交换机类型
直连交换机:Direct exchange
主题交换机:Topic exchange
扇形交换机:Fanout exchange
首部交换机:Headers exchange
死信交换机:Dead Letter Exchange
交换机的属性
代码实战
直连(Direct实践)
主题 (Topic实践)
扇形(Fanout实践)
Exchange
在RabbitMQ中,生产者发送信息不会直接将消息投送到队列中,而是先将消息投递到交换机中,在由交换机转换到具体的队列,队列再将消息以推送或者拉取方式给消费者进行消费。
生产者将消息发送到Exchange,由Exchange再路由到一个或多个队列中
路由键(RoutingKey)
生产者将信息发送给交换机的时候,会指定RoutingKey指定路由规则
绑定键(BindingKey)
通过绑定键将交换机与队列关联起来,这样RabbitMQ就知道如何正确的将信息路由到队列
交换机类型
直连交换机:Direct exchange
Direct,完全匹配型交换机,此种类型交换机,通过RoutingKey路由键将交换机和队列进行绑定, 消息被发送到exchange时,需要根据消息的RoutingKey,来进行匹配,只将消息发送到完全匹配到此RoutingKey的队列。
如图,不同的key绑定不同的队列,实现不同消息分发至不同队列。
注意:同一个key,可以绑定多个queue队列。如图中,当匹配到key1时,则会将消息分发送至queue1和queue2,这样两个队列都会有相同的消息数据。
主题交换机:Topic exchange
Topic,主题类型交换机,此种交换机与Direct类似,也是需要通过routingkey路由键进行匹配分发,区别在于Topic可以进行模糊匹配,Direct是完全匹配。
Topic中,将routingkey通过"."来分为多个部分,通过如下功能字符来进行匹配:
- "*":代表一个部分
- "#":代表一个或多个部分
举个例子,加入绑定关系如下图:
然后发送一条信息,routingkey为"a.b.c.d",那么根据"."将这个路由键分为了4个部分,此条路由键,将会匹配:
- a.b.c. :成功匹配,因为可以代表一个部分
- a.b.# :成功匹配,因为#可以代表一个或多个部分
- a..c.. : 成功匹配,因为第一和第三部分分别为a和c,且为4个部分,刚好匹配
注意:如果绑定的路由键为 "#" 时,则接受所有消息,因为路由键所有都匹配
扇形交换机:Fanout exchange
Fanout,扇出类型交换机,此种交换机,会将消息分发给所有绑定了此交换机的队列,此时RoutingKey参数无效。
此种方式,最简单快速,性能最好,因为少了中间的匹配判断环节。
首部交换机:Headers exchange
Headers,headers信息类型交换机,此类型交换机不通过routingkey路由键来分发消息,而是通过消息内容中的headers属性来进行匹配。headers类型交换器性能差,在实际中并不常用。
虽然不常用,但也可以了解一下其,此种交换机不通过routingkey,但是通过headers进行绑定,也就是在声明binding绑定关系时,需要传入需要匹配的header的key/value键值对。
死信交换机:Dead Letter Exchange
当一个队列中的消息满足下列情况之一。就可称为死信。
消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
消息是一个过期消息,超时无人消费
要投递的队列消息满了,无法投递
如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机。
交换机的属性
Name:交换机名称
Type:交换机类型,direct、topic、fanout、headers
durability:是否需要持久化,如果持久性,则RabbitMQ重启后,交换机还存在
Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange
Internal:当前Exchange是否用于RabbitMQ内部使用,默认为False
Arguments:扩展参数,用于扩展AMQP协议定制化使用
代码实战
直连(Direct实践)
先写两个队列和自定义一个直连交换机
@Bean
public Queue queue1(){
return new Queue("queue1");
}
@Bean
public Queue queue2(){
return new Queue("queue2");
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange("directExchange");
}
给交换机分别绑定一个队列
@Bean
public Binding binding01(){
return BindingBuilder.bind(queue1()).to(directExchange()).with("aa");
}
@Bean
public Binding binding02(){
return BindingBuilder.bind(queue2()).to(directExchange()).with("bb");
}
在控制类给交换机发送一条消息
@RequestMapping("/send2")
public String send2 () throws Exception{
template.convertAndSend("directExchange","aa","hello");
return "😒";
}
写两个测试类进行测试
package ***.example.consumer;
import ***.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.***ponent;
@***ponent
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues="queue1")
public class ReceiverQ1 {
@RabbitHandler
public void process(String msg){
log.warn("Q1接收到:" + msg);
}
}
package ***.example.consumer;
import ***.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.***ponent;
@***ponent
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues="queue2")
public class ReceiverQ2 {
@RabbitHandler
public void process(String msg){
log.warn("Q2接收到:" + msg);
}
}
接着先启动生产者再启动消费者,但是可能会报错
我们需要先去访问一下
可以发现在RabbitMQ中,出现了我们自定义的那个交换机的名称,队列中也有
再重新启动消费者就不会报错了
主题 (Topic实践)
在这里和上面的步骤一样,在这里测试RoutingKey的一个部分和多个部分
//主题交换机
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}
@Bean
public Binding binding03(){
return BindingBuilder.bind(queue1()).to(topicExchange()).with("*.*.aa");
}
@Bean
public Binding binding04(){
return BindingBuilder.bind(queue2()).to(topicExchange()).with("*.*.bb");
}
@Bean
public Binding binding05(){
return BindingBuilder.bind(queue1()).to(topicExchange()).with("mq.#");
}
@Bean
public Binding binding06(){
return BindingBuilder.bind(queue2()).to(topicExchange()).with("mq.#");
}
@RequestMapping("/send3")
public String send3 (String rex){
template.convertAndSend("topicExchange",rex,"hello");
return "😒";
}
在页面上运行a.a.aa可以发现这个是进入到了Q1
运行a.a.bb是进入到了Q2
如果两个都想要接收到,就可以在页面上输入mq.a.a
扇形(Fanout实践)
步骤同上
//扇形交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
@Bean
public Binding binding07(){
return BindingBuilder.bind(queue1()).to(fanoutExchange());
}
@Bean
public Binding binding08(){
return BindingBuilder.bind(queue2()).to(fanoutExchange());
}
@RequestMapping("/send4")
public String send4(){
template.convertAndSend("fanoutExchange","","hello");
return "😒";
}
在页面上运行可以发现两个都可以出来