一、rocketmq 的基本概念
1.消息模型(Message Model)
RocketMQ 主要由 Producer、Broker、Consumer 三部分组成,其中 Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个 Topic 的消息,每个Topic 的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个 Topic 中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个 Consumer 实例构成。
2.消息生产者(Producer)
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 broker 服务器。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息,单向发送不需要。
3.消息消费者(Consumer)
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
4.主题(Topic)
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ 进行消息订阅的基本单位。
5.代理服务器(Broker Server)
消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
6.名字服务(Name Server)
名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP 列表。多个 Namesrv 实例组成集群,但相互独立,没有信息交换。
特性
- 同步发送消息
- 异步发送消息
- 以单向模式发送消息
- 发送有序消息
- 发送批量消息
- 发送交易消息
- 发送具有延迟级别的预定消息
- 以并发模式(广播/集群)消费消息
- 消费有序消息
- 使用标记或 sql92 表达式过滤消息
- 支持消息追踪
- 支持认证和授权
- 支持请求-回复消息交换模式
- 使用推/拉模式消费消息
集成
直接使用官方的 rocketmq-spring-boot-starter
1.添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>***.chaoyue</groupId>
<artifactId>rocketmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rocketmq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.1.1版本,内置的 rocketmq-client 版本为4.7.1应该与你 rocketmq 服务保持一致。
2.添加配置
application.yml 文件中添加如下配置:
server:
port: 8081
rocketmq:
# NameServer
name-server: 192.168.1.38:9876
# 默认的消息组
producer:
group: group1
3.新建生产者类 RocketMQProducer
注入 RocketMQTemplate
package ***.chaoyue.rocketmq.producer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.***ponent;
@***ponent
public class RocketMQProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 发送消息
public void sendMessage(String topic, String msg) {
rocketMQTemplate.convertAndSend(topic, msg);
}
}
4.新建消费者类 RocketMQConsumer
package ***.chaoyue.rocketmq.consumer;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.***ponent;
/**
* 泛型可选
*/
@***ponent
@RocketMQMessageListener(consumerGroup = "group1", topic = "topic1")
public class RocketMQConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Received message : " + message);
}
}
5.新建控制类 RocketMQController
package ***.chaoyue.rocketmq.controller;
import ***.chaoyue.rocketmq.producer.RocketMQProducer;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@RequestMapping("/RocketMQ")
public class RocketMQController {
private final String topic = "topic1";
@Resource
private RocketMQProducer producer;
@RequestMapping("/sendMessage")
public String sendMessage(String message) {
producer.sendMessage(topic, message);
return "消息已发送";
}
}
6.启动服务并测试
启动服务后,浏览器输入:http://localhost:8081/RocketMQ/sendMessage?message=hi,sendmessage:
生产者发送消息
同步发送消息
可靠的同步传输应用于广泛的场景,如重要通知消息、短信通知、短信营销系统等。
// 发送字符串
rocketMQTemplate.syncSend("springTopic", "Hello, World!");
// 同步发送
rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");
// 发送对象
rocketMQTemplate.syncSend("userTopic", new User().setUserAge((byte) 18).setUserName("Kitty"));
// 发送spring 消息
rocketMQTemplate.syncSend(userTopic, MessageBuilder.withPayload(
new User().setUserAge((byte) 21).setUserName("Lester")).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE).build());
异步发送消息
异步传输一般用于响应时间敏感的业务场景。
rocketMQTemplate.asyncSend("orderPaidTopic", "异步发送", new SendCallback() {
@Override
public void onSu***ess(SendResult var1) {
// 成功回调
System.out.printf("async onSucess SendResult=%s %n", var1);
}
@Override
public void onException(Throwable var1) {
// 失败回调
System.out.printf("async onException Throwable=%s %n", var1);
}
});
单向发送消息
单向传输用于需要中等可靠的情况,例如日志收集
rocketMQTemplate.sendOneway("springTopic", "Hello, World!");
发送有序消息
rocketMQTemplate.syncSendOrderly("orderly_topic",MessageBuilder.withPayload("Hello, World").build(),"hashkey")
发送事务消息
Message msg = MessageBuilder.withPayload("rocketMQTemplate transactional message ").build();
// 第一个参数必须与@RocketMQTransactionListener的成员字段'transName'相同
rocketMQTemplate.sendMessageInTransaction("test-topic", msg, null);
// 使用注解@RocketMQTransactionListener定义事务监听器
@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// ... local transaction process, return bollback, ***mit or unknown
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// ... check transaction status and return bollback, ***mit or unknown
return RocketMQLocalTransactionState.***MIT;
}
}
发送特殊标签(tag)消息
rocketMQTemplate.convertAndSend(msgExtTopic + ":tag0", "I'm from tag0"); // tag0 不是消费者选择的,可以通过tag过滤掉
rocketMQTemplate.convertAndSend(msgExtTopic + ":tag1", "I'm from tag1");
发送批量消息
List<Message> msgs = new ArrayList<Message>();
for (int i = 0; i < 10; i++) {
msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).
setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
}
SendResult sr = rocketMQTemplate.syncSend(springTopic, msgs, 60000);
Push 模式
消费者
@Slf4j
@Service
@RocketMQMessageListener(topic = "laker-123", consumerGroup = "laker_consumer_group")
public class MyConsumer1 implements RocketMQListener<String> {
public void onMessage(String message) {
log.info("received message: {}", message);
}
}
@Slf4j
@Service
@RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2")
public class MyConsumer2 implements RocketMQListener<OrderPaidEvent> {
public void onMessage(OrderPaidEvent orderPaidEvent) {
log.info("received orderPaidEvent: {}", orderPaidEvent);
}
}
Pull 模式
从RocketMQ Spring 2.2.0开始,RocketMQ Srping支持Pull模式消费
修改application.properties
## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.consumer.group=my-group1
rocketmq.consumer.topic=test
编写代码
@SpringBootApplication
public class ConsumerApplication implements ***mandLineRunner {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Override
public void run(String... args) throws Exception {
//This is an example of pull consumer using rocketMQTemplate.
List<String> messages = rocketMQTemplate.receive(String.class);
System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages);
}
}
高级
请求应答语义
RocketMQ-Spring 提供 请求/应答 语义支持。
producer 端
发送Request消息使用SendAndReceive方法
同步发送需要在方法的参数中指明返回值类型
异步发送需要在回调的接口中指明返回值类型
// 同步发送request并且等待String类型的返回值
String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class);
// 异步发送request并且等待User类型的返回值
rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback<User>() {
@Override public void onSu***ess(User message) {
System.out.printf("send user object and receive %s %n", message.toString());
}
@Override public void onException(Throwable e) {
e.printStackTrace();
}
}, 5000);
Consumer端
需要实现RocketMQReplyListener<T, R> 接口,其中T表示接收值的类型,R表示返回值的类型。
@SpringBootApplication
public class ConsumerApplication{
public static void main(String[] args){
SpringApplication.run(ConsumerApplication.class, args);
}
@Service
@RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer")
public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
@Override
public String onMessage(String message) {
System.out.printf("------- StringConsumerWithReplyString received: %s \n", message);
return "reply string";
}
}
@Service
@RocketMQMessageListener(topic = "objectRequestTopic", consumerGroup = "objectRequestConsumer")
public class ObjectConsumerWithReplyUser implements RocketMQReplyListener<User, User>{
public void onMessage(User user) {
System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", user);
User replyUser = new User("replyUserName",(byte) 10);
return replyUser;
}
}
@Data
@AllArgsConstructor
public class User implements Serializable{
private String userName;
private Byte userAge;
}
}
ACL功能
Producer 端要想使用 ACL 功能,需要多配置两个配置项:
## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
rocketmq.producer.a***ess-key=AK
rocketmq.producer.secret-key=SK
Consumer 端 ACL 功能需要在 @RocketMQMessageListener
中进行配置
@Service
@RocketMQMessageListener(
topic = "test-topic-1",
consumerGroup = "my-consumer_test-topic-1",
a***essKey = "AK",
secretKey = "SK"
)
public class MyConsumer implements RocketMQListener<String> {
...
}
注意:
可以不用为每个@RocketMQMessageListener
注解配置 AK/SK,在配置文件中配置rocketmq.consumer.a***ess-key
和rocketmq.consumer.secret-key
配置项,这两个配置项的值就是默认值
消息轨迹
Producer 端要想使用消息轨迹,需要多配置两个配置项:
## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
rocketmq.producer.enable-msg-trace=true
rocketmq.producer.customized-trace-topic=my-trace-topic
Consumer 端消息轨迹的功能需要在 @RocketMQMessageListener
中进行配置对应的属性:
@Service
@RocketMQMessageListener(
topic = "test-topic-1",
consumerGroup = "my-consumer_test-topic-1",
enableMsgTrace = true,
customizedTraceTopic = "my-trace-topic"
)
public class MyConsumer implements RocketMQListener<String> {
...
}
注意:
默认情况下 Producer 和 Consumer 的消息轨迹功能是开启的且trace-topic
为RMQ_SYS_TRACE_TOPIC
Consumer 端的消息轨迹 trace-topic 可以在配置文件中配置rocketmq.consumer.customized-trace-topic
配置项,不需要为在每个@RocketMQMessageListener
配置。
阿里云消息轨迹正常显示需要设置a***essChannel配置为CLOUD。
常见问题
-
生产环境有多个
nameserver
该如何连接?rocketmq.name-server
支持配置多个nameserver
地址,采用;分隔即可。例如:172.19.0.1:9876;172.19.0.2:9876
-
rocketMQTemplate
在什么时候被销毁?
开发者在项目中使用rocketMQTemplate
发送消息时,不需要手动执行rocketMQTemplate.destroy()
方法,rocketMQTemplate
会在spring容器销毁时自动销毁。 -
启动报错:
Caused by: org.apache.rocketmq.client.exception.MQClientException: The consumer group[xxx] has been created before, specify another name please
RocketMQ
在设计时就不希望一个消费者同时处理多个类型的消息,因此同一个consumerGroup
下的consumer职责应该是一样的,不要干不同的事情(即消费多个topic)。建议consumerGroup
与topic
一一对应。 -
发送的消息内容体是如何被序列化与反序列化的?
RocketMQ
的消息体都是以byte[]
方式存储。当业务系统的消息内容体如果是java.lang.String
类型时,统一按照utf-8
编码转成byte[]
;如果业务系统的消息内容为非java.lang.String
类型,则采用jackson-databind
序列化成JSON格式的字符串之后,再统一按照utf-8编码转成byte[]
。 -
如何指定
topic
的tags
?RocketMQ
的最佳实践中推荐:一个应用尽可能用一个Topic
,消息子类型用tags
来标识,tags可以由应用自由设置。 在使用rocketMQTemplate
发送消息时,通过设置发送方法的destination
参数来设置消息的目的地,destination的格式为topi***ame:tagName,:前面表示topic的名称,后面表示tags名称
。注意:
tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个。 -
发送消息时如何设置消息的key?
可以通过重载的xxxSend(String destination, Message<?> msg, ...)
方法来发送消息,指定msg的headers
来完成。示例:Message<?> message = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, msgId).build(); rocketMQTemplate.send("topic-test", message);
同理还可以根据上面的方式来设置消息的FLAG、WAIT_STORE_MSG_OK以及一些用户自定义的其它头信息。
注意:
在将Spring的Message转化为RocketMQ的Message时,为防止header
信息与RocketMQ
的系统属性冲突,在所有header
的名称前面都统一添加了前缀USERS_
。因此在消费时如果想获取自定义的消息头信息,请遍历头信息中以USERS_
开头的key
即可。 -
消费消息时,除了获取消息payload外,还想获取RocketMQ消息的其它系统属性,需要怎么做?
消费者在实现RocketMQListener
接口时,只需要起泛型为MessageExt
即可,这样在onMessage
方法将接收到RocketMQ
原生的MessageExt
消息。@Slf4j @Service @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1") public class MyConsumer2 implements RocketMQListener<MessageExt>{ public void onMessage(MessageExt messageExt) { log.info("received messageExt: {}", messageExt); } }
-
如何指定消费者从哪开始消费消息,或开始消费的位置?
消费者默认开始消费的位置请参考:RocketMQ FAQ
。 若想自定义消费者开始的消费位置,只需在消费者类添加一个RocketMQPushConsumerLifecycleListener
接口的实现即可。 示例如下:@Slf4j @Service @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1") public class MyConsumer1 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener { @Override public void onMessage(String message) { log.info("received message: {}", message); } @Override public void prepareStart(final DefaultMQPushConsumer consumer) { // set consumer consume message from now consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis())); } }
同理,任何关于
DefaultMQPushConsumer
的更多其它其它配置,都可以采用上述方式来完成。 -
如何发送事务消息?
在客户端,首先用户需要实现RocketMQLocalTransactionListener
接口,并在接口类上注解声明@RocketMQTransactionListener
,实现确认和回查方法;然后再使用资源模板RocketMQTemplate
, 调用方法sendMessageInTransaction()
来进行消息的发布。注意:从RocketMQ-Spring 2.1.0版本之后,注解@RocketMQTransactionListener不能设置txProducerGroup、ak、sk,这些值均与对应的RocketMQTemplate保持一致。
-
如何声明不同
name-server
或者其他特定的属性来定义非标的RocketMQTemplate
?
第一步: 定义非标的RocketMQTemplate
使用你需要的属性,可以定义与标准的RocketMQTemplate
不同的nameserver
、groupname
等。如果不定义,它们取全局的配置属性值或默认值。// 这个RocketMQTemplate的Spring Bean名是'extRocketMQTemplate', 与所定义的类名相同(但首字母小写) @ExtRocketMQTemplateConfiguration(nameServer="127.0.0.1:9876" , ... // 定义其他属性,如果有必要。 ) public class ExtRocketMQTemplate extends RocketMQTemplate { //类里面不需要做任何修改 }
第二步: 使用这个非标RocketMQTemplate
@Resource(name = "extRocketMQTemplate") // 这里必须定义name属性来指向上述具体的Spring Bean. private RocketMQTemplate extRocketMQTemplate;
接下来就可以正常使用这个extRocketMQTemplate了。
-
如何使用非标的
RocketMQTemplate
发送事务消息?
首先用户需要实现RocketMQLocalTransactionListener
接口,并在接口类上注解声明@RocketMQTransactionListener
,注解字段的rocketMQTemplateBeanName
指明为非标的RocketMQTemplate
的Bean name
(若不设置则默认为标准的RocketMQTemplate
),比如非标的RocketMQTemplate
Bean name为“extRocketMQTemplate"
,则代码如下:@RocketMQTransactionListener(rocketMQTemplateBeanName = "extRocketMQTemplate") class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // ... local transaction process, return bollback, ***mit or unknown return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // ... check transaction status and return bollback, ***mit or unknown return RocketMQLocalTransactionState.***MIT; } }
然后使用
extRocketMQTemplate
调用sendMessageInTransaction()
来发送事务消息。 -
MessageListener
消费端,是否可以指定不同的name-server
而不是使用全局定义的’rocketmq.name-server’
属性值 ?@Service @RocketMQMessageListener( nameServer = "NEW-NAMESERVER-LIST", // 可以使用这个optional属性来指定不同的name-server topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1", enableMsgTrace = true, customizedTraceTopic = "my-trace-topic" ) public class MyNameServerConsumer implements RocketMQListener<String> { ... }
参考
https://blog.csdn.***/abu935009066/article/details/121352742
https://blog.csdn.***/u012069313/article/details/122403509