目录
一、前期项目环境准备
1.1父项目以及子项目
1.2配置pom.xml
1.3配置application.yml
二、扇出(Fanout) 交换机实现消息的发送和接收
2.1编写子项目consumer(消费者,接收消息)的代码实现扇出(Fanout)交换机接收消息
2.1.1consumer子项目结构
2.1.2FanoutConfig类的实现扇出(Fanout)交换机、队列以及交换机和队列的绑定的创建
2.1.3springRabbitListener类实现Fanout消息的接收
2.1.4运行consumer子项目
2.2编写子项目publisher的代码实现扇出(Fanout)交换机的发送消息
2.2.1publisher子项目结构
2.2.2编写SpringAmqpTest测试类代码
2.2.3运行该测试类,然后回到consumer子项目的两个队列是否都能够接收到该消息
三、订阅(Direct)交换机实现消息的发送和接收
3.1编写子项目consumer(消费者,接收消息)的代码实现扇出订阅(Direct)交换机接收消息
3.1.1在springRabbitListener类添加以下两个Direct队列方法
3.1.2再次重新运行consumer子项目
3.2编写子项目publisher的代码实现扇出(Direct)交换机的发送消息
3.2.1在SpringAmqpTest测试类继续添加下面代码
3.2.2运行该测试类,然后回到consumer子项目的key拥有red的监听队列是否都够接收到该消息
四、主题(Topic)交换机实现消息的发送和接收
4.1编写子项目consumer(消费者,接收消息)的代码实现扇出主题(Topic)交换机接收消息
4.1.1在springRabbitListener类添加以下两个Topic队列方法
4.1.2再次重新运行consumer子项目
4.2编写子项目publisher的代码实现扇出(Topic)交换机的发送消息
4.2.1在SpringAmqpTest测试类继续添加下面代码
4.2.2运行该测试类,然后回到consumer子项目的key拥有.news的监听队列是否都够接收到该消息
五、总结
5.1交换机的作用是什么?
5.2声明队列、交换机、绑定关系的Bean是什么?
5.3描述下Direct交换机与Fanout交换机的差异?
5.4基于@RabbitListener注解声明队列和交换机有哪些常见注解?
5.5描述下Direct交换机与Topic交换机的差异?
一、前期项目环境准备
1.1父项目以及子项目
创建父项目mq-demo,以及两个子项目publisher(生产者,发送消息)和consumer(消费者,接收消息)
1.2配置pom.xml
在父项目mq-demo的pom.xml引入以下的amqp依赖
<!--AMQP依赖,包含rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1.3配置application.yml
在两个子项目的application.yml都进行以下的配置(设置连接参数,分别是:主机名、端口号、用户名、密码、virtual-host):
spring:
rabbitmq:
host: 192.168.22.134
port: 5672 #发送信息的端口,不是启动rabbitmq管理页面的端口(15672)
username: itcast
password: 123321
virtual-host: /
二、扇出(Fanout) 交换机实现消息的发送和接收
在扇出(广播)模式下,消息发送流程是这样的:
-
1) 可以有多个队列
-
2) 每个队列都要绑定到Exchange(交换机)
-
3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
-
4) 交换机把消息发送给绑定过的所有队列
-
5) 订阅队列的消费者都能拿到消息
2.1编写子项目consumer(消费者,接收消息)的代码实现扇出(Fanout)交换机接收消息
2.1.1consumer子项目结构
其中config包是配置包,listener包是监听消息以便接收的包
2.1.2FanoutConfig类的实现扇出(Fanout)交换机、队列以及交换机和队列的绑定的创建
分别创建一个名为itcast.fanout的交换机、一个名为fanout.queue1的队列、一个名为fanout.queue2d1队列以及交换机和两个队列之间的绑定。
java">@Configuration
public class FanoutConfig {
// 声明FanoutExchange交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
// 声明第1个队列
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
//绑定队列1和交换机
@Bean
public Binding fanoutBing1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
// 声明第2个队列
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
//绑定队列2和交换机
@Bean
public Binding fanoutBing2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
2.1.3springRabbitListener类实现Fanout消息的接收
利用@RabbitListeneer注解的queues属性="队列名称",来对队列的消息进行监听。
@***ponent
public class springRabbitListener {
@RabbitListener(queues = "fanout.queue1")
public void ListenerFanoutQueue1(String msg) {
System.out.println("消费者接收到fanout.queue1的消息【"+msg+"】");
}
@RabbitListener(queues = "fanout.queue2")
public void ListenerFanoutQueue2(String msg) {
System.out.println("消费者接收到fanout.queue2的消息【"+msg+"】");
}
}
2.1.4运行consumer子项目
在http://192.168.22.134:15672/(这里输入自己的ip地址和自己的RabbitMQ端口号)查看名为itcast.fanout交换机、名为fanout.queue1队列和名为fanout.queue2是否创建成功。
2.2编写子项目publisher的代码实现扇出(Fanout)交换机的发送消息
2.2.1publisher子项目结构
publisher子项目结构比consumer子项目结构而言相对简单了,在publisher子项目结构中我们只需编写@Test类来完成消息的发送,然后看看consumer子项目是否能够接收到该消息即可。
2.2.2编写SpringAmqpTest测试类代码
需要指定交换机的名称和发送的消息,然后利用rabbitTemplate.convertAndSend()方法进行消息的发送
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendFanoutExchange(){
//交换机名称
String exchangeName ="itcast.fanout";
//消息
String message = "hello, every one!";
// 发送消息,参数分别是:交互机名称、RoutingKey(暂时为空)、消息
rabbitTemplate.convertAndSend(exchangeName,"",message);
}
}
2.2.3运行该测试类,然后回到consumer子项目的两个队列是否都能够接收到该消息
三、订阅(Direct)交换机实现消息的发送和接收
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
-
队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) -
消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 -
Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
3.1编写子项目consumer(消费者,接收消息)的代码实现扇出订阅(Direct)交换机接收消息
为了编码的方便,我们就直接在子项目consumer下的listener包里面的springRabbitListener类使用注解的方式进行交换机、队列的创建和消息的监听了。
3.1.1在springRabbitListener类添加以下两个Direct队列方法
这里我们直接使用@RabbitListener注解进行交换机、队列的创建和消息的监听。这里不是使用binding而是使用bindings进行构建,其中:
@Queue里面的name属性是队列的名称
@Exchange里面的name属性是交换机的名称,type属性是交换机的类型(这里是direct类型)
key属性是指定消息的 RoutingKey,接收拥有相同的RoutingKey队列的消息
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct",
type = ExchangeTypes.DIRECT),
key = {"red","blue"}
))
public void ListenerDirectQueue1(String msg){
System.out.println("消费者接收到direct.queue1的消息【"+msg+"】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "itcast.direct",
type = ExchangeTypes.DIRECT),
key = {"red","yellow"}
))
public void ListenerDirectQueue2(String msg){
System.out.println("消费者接收到direct.queue2的消息【"+msg+"】");
}
3.1.2再次重新运行consumer子项目
在http://192.168.22.134:15672/(这里输入自己的ip地址和自己的RabbitMQ端口号)查看名为itcast.direct交换机、名为direct.queue1队列和名为direct.queue2是否创建成功。
3.2编写子项目publisher的代码实现扇出(Direct)交换机的发送消息
为了编码的方便,我们就直接在子项目publisher下的SpringAmqpTest测试类进行编码。
3.2.1在SpringAmqpTest测试类继续添加下面代码
这里的 rabbitTemplate.convertAndSend()方法相对于Fanout的测试类来说,要在中间多填一个参数的名称,该名称就是RoutingKey(指定队列key),向监听消息队列也是该key的发送消息,比如这里的发送消息的RoutingKey为red,那么只有在监听消息队列的key也拥有red才能接收到这个消息
@Test
public void testSendDirectExchange(){
//交换机名称
String exchangeName ="itcast.direct";
//消息
String message = "hello, red!";
// 发送消息,参数分别是:交互机名称、RoutingKey(指定队列key)、消息
rabbitTemplate.convertAndSend(exchangeName,"red",message);
}
3.2.2运行该测试类,然后回到consumer子项目的key拥有red的监听队列是否都够接收到该消息
从上面的3.1.1编码可知,我们两个队列的key都拥有red,所以两个队列都接受到了该消息
四、主题(Topic)交换机实现消息的发送和接收
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
举例:
item.#:能够匹配item.spu.insert 或者 item.spu
item.*:只能匹配item.spu
解释:
Queue1:绑定的是china.# ,因此凡是以 china.开头的routing key 都会被匹配到。包括china.news和china.weather
Queue4:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配。包括china.news和japan.news
4.1编写子项目consumer(消费者,接收消息)的代码实现扇出主题(Topic)交换机接收消息
为了编码的方便,我们就直接在子项目consumer下的listener包里面的springRabbitListener类使用注解的方式进行交换机、队列的创建和消息的监听了。
4.1.1在springRabbitListener类添加以下两个Topic队列方法
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "itcast.topic",
type = ExchangeTypes.TOPIC),
key = "China.#"
))
public void ListenerTopicQueue1(String msg){
System.out.println("消费者接收到topic.queue1的消息【"+msg+"】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "itcast.topic",
type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void ListenerTopicQueue2(String msg){
System.out.println("消费者接收到topic.queue2的消息【"+msg+"】");
}
这里我们直接使用@RabbitListener注解进行交换机、队列的创建和消息的监听。这里不是使用binding而是使用bindings进行构建,其中:
@Queue里面的name属性是队列的名称
@Exchange里面的name属性是交换机的名称,type属性是交换机的类型(这里是topic类型)
key属性是指定消息的 RoutingKey,接收拥有相同的RoutingKey队列的消息,
但是不同的是,这里我们可以使用#或者*的通配符进行更方便的匹配发送和接收消息的队列
。
4.1.2再次重新运行consumer子项目
在http://192.168.22.134:15672/(这里输入自己的ip地址和自己的RabbitMQ端口号)查看名为itcast.topic交换机、名为topic.queue1队列和名为topic.queue2是否创建成功。
4.2编写子项目publisher的代码实现扇出(Topic)交换机的发送消息
为了编码的方便,我们就直接在子项目publisher下的SpringAmqpTest测试类进行编码。
4.2.1在SpringAmqpTest测试类继续添加下面代码
这里的 rabbitTemplate.convertAndSend()方法相对于Fanout的测试类来说,要在中间多填一个参数的名称,该名称就是RoutingKey(指定队列key),向监听消息队列也是该key的发送消息,比如这里的发送消息的RoutingKey为USA.news,那么只有在监听消息队列的key也拥有.news才能接收到这个消息
@Test
public void testSendTopicExchange(){
//交换机名称
String exchangeName ="itcast.topic";
//消息
String message = "今天是个好日子!";
// 发送消息,参数分别是:交互机名称、RoutingKey(通配符(*代表一个,#代码0个或多个))、消息
//rabbitTemplate.convertAndSend(exchangeName,"China.news",message);//两个队列都收到消息
//rabbitTemplate.convertAndSend(exchangeName,"China.weather",message);//队列1收到消息
rabbitTemplate.convertAndSend(exchangeName,"USA.news",message);//队列2收到消息
}
4.2.2运行该测试类,然后回到consumer子项目的key拥有.news的监听队列是否都够接收到该消息
从上面的3.1.1编码可知,只有队列2是#.news,所以只有对了2接受到了该消息
五、总结
5.1交换机的作用是什么?
接收publisher发送的消息
将消息按照规则路由到与之绑定的队列
不能缓存消息,路由失败,消息丢失
FanoutExchange的会将消息路由到每个绑定的队列
5.2声明队列、交换机、绑定关系的Bean是什么?
Queue、FanoutExchange、Binding
5.3描述下Direct交换机与Fanout交换机的差异?
Fanout交换机将消息路由给每一个与之绑定的队列
Direct交换机根据RoutingKey判断路由给哪个队列
如果多个队列具有相同的RoutingKey,则与Fanout功能类似
5.4基于@RabbitListener注解声明队列和交换机有哪些常见注解?
@Queue和@Exchange
5.5描述下Direct交换机与Topic交换机的差异?
Topic交换机接收的消息RoutingKey必须是多个单词,以.分割
Topic交换机与队列绑定时的bindingKey可以指定通配符
#
:代表0个或多个词
*
:代表1个词
今天的知识就分享到这了,希望能够帮助到你~