🌈 个人主页:Zfox_
🔥 系列专栏:C++框架/库
🔥 介绍
RabbitMQ 是一个开源的消息代理软件,基于 AMQP(高级消息队列协议)实现,广泛用于构建分布式系统中的消息传递和通信。
RabbitMQ 就是一个 “传话筒”(中间转发),它帮助不同的软件系统之间传递消息。比如,你在网上买东西,系统会通过 RabbitMQ 把你的订单信息传递给物流系统,这样物流系统就知道要发货了。
它为啥厉害
- 能扛住大流量:就算很多人同时买东西,它也不会卡住。
-
消息不会丢:
即使系统突然坏了,消息也能保存下来,等系统好了再继续传。 - 能灵活配置:你可以根据需要设置消息怎么传、传给谁。
- 支持多种语言:无论是用 Java、Python 还是其他语言开发的系统,都能和它配合。
它怎么工作
- 生产者:就是发消息的人,比如你买东西的系统。
- 消费者:就是收消息的人,比如物流系统。
-
交换机(Exchange):
就像一个分发中心,决定消息该发给谁 - 队列(Queue):消息的临时存放地,消费者从这里取消息。
-
绑定(Binding):
告诉交换机,哪些消息要发到哪个队列
它有哪些模式
- 简单模式:一个生产者,一个队列,一个消费者,最简单直接。
- 工作队列模式:多个消费者一起干活,消息会被轮流分配给他们。
- 发布订阅模式:消息会被广播给所有订阅的消费者。
- 路由模式:根据消息的标签(路由键)发给特定的队列。
- 主题模式:根据消息的模式匹配,发给符合条件的队列。
它能干啥
- 异步处理:比如你上传文件,系统可以用 RabbitMQ 异步处理,你不用等很久。
- 解耦:让不同的系统之间不直接联系,减少互相影响。
- 流量削峰:在高流量时,它能缓冲请求,保护后端系统。
- 日志处理:收集分布式系统的日志,方便分析。
- 任务调度:比如定时任务,可以用 RabbitMQ 实现。
🔥 安装 RabbitMQ
sudo apt install rabbitmq-server
# 启动服务
sudo systemctl start rabbitmq-server.service
# 查看服务状态
sudo systemctl status rabbitmq-server.service
# 安装完成的时候默认有个用户 guest ,但是权限不够,要创建一个 administrator 用户,才可以做为远程登录和发表订阅消息:
# 添加用户
sudo rabbitmqctl add_user root 123456
# 设置用户 tag
sudo rabbitmqctl set_user_tags root administrator
# 设置用户权限
sudo rabbitmqctl set_permissions -p / root "." "." ".*"
# RabbitMQ 自带了 web 管理界面,执行下面命令开启
sudo rabbitmq-plugins enable rabbitmq_management
访问 webUI 界面, 默认端口为 15672
至此 rabbitmq 安装成功
🔥 安装 RabbitMQ 客户端库
如果需要在其他应用程序中使用 RabbitMQ,则需要安装 RabbitMQ 的客户端库。
🦋 安装 RabbitMQ 的 C++客户端库
- C 语言库 : https://github.***/alanxz/rabbitmq-c
- C++库 : https://github.***/CopernicaMarketingSoftware/AMQP-CPP/tree/master
我们这里使用 AMQP-CPP 库来编写客户端程序
🦋 安装 AMQP-CPP
sudo apt install libev-dev #libev 网络库组件
git clone https://github.***/CopernicaMarketingSoftware/AMQP-CPP.git
cd AMQP-CPP/
make
make install
安装报错:
In file included from linux_tcp/openssl.h:20,
from linux_tcp/openssl.cpp:12:
/usr/include/openssl/ssl.h:991:1: error: expected constructor, destructor, or type conversion before ‘typedef’
991 | typedef enum {
| ^~~~~~~
/usr/include/openssl/ssl.h:1042:3: error: ‘OSSL_HANDSHAKE_STATE’ does not name a type; did you mean ‘SSL_CB_HANDSHAKE_START’?
1042 | } OSSL_HANDSHAKE_STATE;
| ^~~~~~~~~~~~~~~~~~~~
| SSL_CB_HANDSHAKE_START
/usr/include/openssl/ssl.h:1888:1: error: expected constructor, destructor, or type conversion before ‘DEPRECATEDIN_1_1_0’
1888 | DEPRECATEDIN_1_1_0(__owur const SSL_METHOD *TLSv1_server_method(void))
| ^~~~~~~~~~~~~~~~~~
/usr/include/openssl/ssl.h:2007:8: error: ‘OSSL_HANDSHAKE_STATE’ does not name a type; did you mean ‘SSL_CB_HANDSHAKE_START’?
2007 | __owur OSSL_HANDSHAKE_STATE SSL_get_state(const SSL *ssl);
| ^~~~~~~~~~~~~~~~~~~~
| SSL_CB_HANDSHAKE_START
In file included from linux_tcp/openssl.h:21,
from linux_tcp/openssl.cpp:12:
/usr/include/openssl/err.h:261:1: error: expected constructor, destructor, or type conversion before ‘DEPRECATEDIN_1_0_0’
261 | DEPRECATEDIN_1_0_0(void ERR_remove_state(unsigned long pid))
| ^~~~~~~~~~~~~~~~~~
make[1]: *** [Makefile:52: linux_tcp/openssl.o] Error 1
make[1]: Leaving directory '/root/install/AMQP-CPP/src'
make: *** [Makefile:9: all] Error 2
这种错误,表示 ssl 版本出现问题
解决方案:卸载当前的 ssl 库,重新进行修复安装
sudo dpkg -P --force-all libevent-openssl-2.1-7
sudo dpkg -P --force-all openssl
sudo dpkg -P --force-all libssl-dev
sudo apt --fix-broken install
修复后,重新进行 make
ev 相关接口:
🔥 AMQP-CPP 库的简单使用
🦋 介绍
- AMQP-CPP 是用于与 RabbitMq 消息中间件通信的 c++库。它能解析从 RabbitMq 服务发送来的数据,也可以生成发向 RabbitMq 的数据包。 AMQP-CPP 库不会向 RabbitMq 建立网络连接,所有的网络 io 由用户完成。
- 当然, AMQP-CPP 提供了可选的网络层接口,它预定义了 TCP 模块,用户就不用自己实现网络 io,我们也可以选择 libevent、 libev、 libuv、 asio 等异步通信组件,需要手动安装对应的组件。
- AMQP-CPP 完全异步,没有阻塞式的系统调用,不使用线程就能够应用在高性能应用中。
- 注意:它需要 c++17 的支持。
🦋 使用
AMQP-CPP 的使用有两种模式:
- 使用默认的 TCP 模块进行网络通信
- 使用扩展的 libevent、 libev、 libuv、 asio 异步通信组件进行通信
🎀 TCP 模式
- 实现一个类继承自 AMQP::TcpHandler 类, 它负责网络层的 TCP 连接
- 重写相关函数, 其中必须重写 monitor 函数
- 在 monitor 函数中需要实现的是将 fd 放入 eventloop(select、 epoll)中监控, 当 fd 可写可读就绪之后, 调用 AMQP-CPP 的 connection->process(fd, flags)方法
🎀 扩展模式
以 libev 为例, 我们不必要自己实现 monitor 函数, 可以直接使用 AMQP::LibEvHandler
🔥 常用类与接口介绍
🎀 Channel
channel 是一个虚拟连接,一个连接上可以建立多个通道。并且所有的 RabbitMq 指令都是通过 channel 传输,所以连接建立后的第一步,就是建立 channel。因为所有操作是异步的,所以在 channel 上执行指令的返回值并不能作为操作执行结果,实际上它返回的是 Deferred 类,可以使用它安装处理函数。
namespace AMQP
{
/**
* Generic callbacks that are used by many deferred objects
*/
using Su***essCallback = std::function<void()>;
using ErrorCallback = std::function<void(const char
*message)>;
using FinalizeCallback = std::function<void()>;
/**
* Declaring and deleting a queue
*/
using QueueCallback = std::function<void(const std::string &name, uint32_t messagecount, uint32_t consumercount)>;
using DeleteCallback = std::function<void(uint32_t deletedmessages)>;
using MessageCallback = std::function<void(
const Message &message,
uint64_t deliveryTag,
bool redelivered)>;
// 当使用发布者确认时,当服务器确认消息已被接收和处理时,将调用
AckCallback using AckCallback = std::function<void(
uint64_t deliveryTag,
bool multiple)>;
// 使用确认包裹通道时,当消息被 ack/nacked 时,会调用这些回调
using PublishAckCallback = std::function<void()>;
using PublishNackCallback = std::function<void()>;
using PublishLostCallback = std::function<void()>;
class Channel
{
Channel(Connection *connection);
bool connected()
/**
*声明交换机
*如果提供了一个空名称,则服务器将分配一个名称。
*以下 flags 可用于交换机:
*
*-durable 持久化,重启后交换机依然有效
*-autodelete 删除所有连接的队列后,自动删除交换
*-passive 仅被动检查交换机是否存在
*-internal 创建内部交换
*
*@param name 交换机的名称
*@param-type 交换类型
enum ExchangeType
{
fanout, 广播交换,绑定的队列都能拿到消息
direct, 直接交换,只将消息交给 routingkey 一致的队列
topic, 主题交换,将消息交给符合 bindingkey 规则的队
列
headers,
consistent_hash,
message_deduplication
};
*@param flags 交换机标志
*@param arguments 其他参数
*
*此函数返回一个延迟处理程序。可以安装回调
using onSu***ess(), onError() and onFinalize() methods.
*/
Deferred &declareExchange(
const std::string_view &name,
ExchangeType type,
int flags,
const Table &arguments)
/**
*声明队列
*如果不提供名称,服务器将分配一个名称。
*flags 可以是以下值的组合:
*
*-durable 持久队列在代理重新启动后仍然有效
*-autodelete 当所有连接的使用者都离开时,自动删除队列
*-passive 仅被动检查队列是否存在
*-exclusive 队列仅存在于此连接,并且在连接断开时自动删除
*
*@param name 队列的名称
*@param flags 标志组合
*@param arguments 可选参数
*
*此函数返回一个延迟处理程序。可以安装回调
*使用 onSu***ess()、 onError()和 onFinalize()方法。
*
Deferred &onError(const char *message)
*
*可以安装的 onSu***ess()回调应该具有以下签名:
void myCallback(const std::string &name,
uint32_t messageCount,
uint32_t consumerCount);
例如:
channel.declareQueue("myqueue").onSu***ess(
[](const std::string &name,
uint32_t messageCount,
uint32_t consumerCount) {
std::cout << "Queue '" << name << "' ";
std::cout << "has been declared with ";
std::cout << messageCount;
std::cout << " messages and ";
std::cout << consumerCount;
std::cout << " consumers" << std::endl;
* });
*/
DeferredQueue &declareQueue(
const std::string_view &name,
int flags,
const Table &arguments)
/**
*将队列绑定到交换机
*
*@param exchange 源交换机
*@param queue 目标队列
*@param routingkey 路由密钥
*@param arguments 其他绑定参数
*
*此函数返回一个延迟处理程序。可以安装回调
*使用 onSu***ess()、 onError()和 onFinalize()方法。
*/
Deferred &bindQueue(
const std::string_view &exchange,
const std::string_view &queue,
const std::string_view &routingkey,
const Table &arguments)
/**
*将消息发布到 exchange
*您必须提供交换机的名称和路由密钥。
然后, RabbitMQ 将尝试将消息发送到一个或多个队列。
使用可选的 flags 参数,可以指定如果消息无法路由到队列时应该发生
的情况。
默认情况下,不可更改的消息将被静默地丢弃。
*
*如果设置了'mandatory'或'immediate'标志,
则无法处理的消息将返回到应用程序。
在开始发布之前,请确保您已经调用了 recall() -方法,
并设置了所有适当的处理程序来处理这些返回的消息。
*
*可以提供以下 flags:
*
*-mandatory 如果设置,服务器将返回未发送到队列的消息
*-immediate 如果设置,服务器将返回无法立即转发给使用者的消息。
*@param exchange 要发布到的交易所
*@param routingkey 路由密钥
*@param envelope 要发送的完整信封
*@param message 要发送的消息
*@param size 消息的大小
*@param flags 可选标志
*/
bool publish(
const std::string_view &exchange,
const std::string_view &routingKey,
const std::string &message,
int flags = 0)
/**
*告诉 RabbitMQ 服务器我们已准备好使用消息-也就是订阅队列消息
*
*调用此方法后, RabbitMQ 开始向客户端应用程序传递消息。
consumer tag 是一个字符串标识符,
如果您以后想通过 channel:: cancel()调用停止它,
可以使用它来标识使用者。
*如果您没有指定使用者 tag,服务器将为您分配一个。
*
*支持以下 flags:
*
*-nolocal 如果设置了,则不会同时消耗在此通道上发布的消息
*-noack 如果设置了,则不必对已消费的消息进行确认
*-exclusive 请求独占访问,只有此使用者可以访问队列
*
*@param queue 您要使用的队列
*@param tag 将与此消费操作关联的消费者标记
*@param flags 其他标记
*@param arguments 其他参数
*
*此函数返回一个延迟处理程序。
可以使用 onSu***ess()、 onError()和 onFinalize()方法安装回
调。
可以安装的 onSu***ess()回调应该具有以下格式:
void myCallback(const std:: string_view&tag);
样例:
channel.consume("myqueue").onSu***ess(
[](const std::string_view& tag) {
std::cout << "Started consuming under tag ";
std::cout << tag << std::endl;
});
*/
DeferredConsumer &consume(
const std::string_view &queue,
const std::string_view &tag,
int flags,
const Table &arguments)
/**
*确认接收到的消息
*
*当在 DeferredConsumer:: onReceived()方法中接收到消息时,
必须确认该消息,
以便 RabbitMQ 将其从队列中删除(除非使用 noack 选项消费)。
*
*支持以下标志:
*
*-多条确认多条消息:之前传递的所有未确认消息也会得到确认
*
*@param deliveryTag 消息的唯一 delivery 标签
*@param flags 可选标志
*@return bool
*/
bool ack(uint64_t deliveryTag, int flags = 0)
};
class DeferredConsumer
{
/*
注册一个回调函数,该函数在消费者启动时被调用。
void onSu***ess(const std::string &consumertag)
*/
DeferredConsumer &onSu***ess(const ConsumeCallback &callback)
/*
注册回调函数,用于接收到一个完整消息的时候被调用
void MessageCallback(const AMQP::Message &message,
uint64_t deliveryTag, bool redelivered)
*/
DeferredConsumer &onReceived(const MessageCallback &callback)
/* Alias for onReceived() */
DeferredConsumer &onMessage(const MessageCallback &callback)
/*
注册要在服务器取消消费者时调用的函数
void CancelCallback(const std::string &tag)
*/
DeferredConsumer &onCancelled(const CancelCallback &callback)
};
class Message : public Envelope
{
const std::string &exchange()
const std::string &routingkey(): q
};
class Envelope : public MetaData
{
const char *body()
uint64_t bodySize()
};
}
🎀 ev
typedef struct ev_async
{
EV_WATCHER(ev_async)
EV_ATOMIC_T sent; /* private */
} ev_async;
// break type
enum
{
EVBREAK_CANCEL = 0, /* undo unloop */
EVBREAK_ONE = 1, /* unloop once */
EVBREAK_ALL = 2 /* unloop all loops */
};
struct ev_loop *ev_default_loop(unsigned int flags EV_CPP(= 0))
#define EV_DEFAULT ev_default_loop(0)
int ev_run(struct ev_loop *loop);
/* break out of the loop */
void ev_break(struct ev_loop *loop, int32_t break_type);
void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents)
void ev_async_init(ev_async *w, callback cb);
void ev_async_start(struct ev_loop *loop, ev_async *w);
void ev_async_send(struct ev_loop *loop, ev_async *w);
第三方库链接
g++ -o example example.cpp -lamqpcpp -lev
二次封装思想:
在项目中使用 rabbitmq 的时候,我们目前只需要交换机与队列的直接交换,实现一台主机将消息发布给另一台主机进行处理的功能,因此在这里可以对 mq 的操作进行简单的封装,使 mq 的操作在项目中更加简便:
封装一个 MQClient:
- 提供声明指定交换机与队列,并进行绑定的功能;
- 提供向指定交换机发布消息的功能
- 提供订阅指定队列消息,并设置回调函数进行消息消费处理的功能
#pragma once
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
#include "logger.hpp"
class MQClient {
public:
using MessageCallback = std::function<void(const char*, size_t)>;
using ptr = std::shared_ptr<MQClient>;
MQClient(const std::string &user, const std::string &passwd, const std::string host)
{
_loop = EV_DEFAULT;
_handler = std::make_unique<AMQP::LibEvHandler>(_loop);
//amqp://root:123456@127.0.0.1:5672/
std::string url = "amqp://" + user + ":" + passwd + "@" + host + "/";
AMQP::Address address(url);
_connection = std::make_unique<AMQP::TcpConnection>(_handler.get(), address);
_channel = std::make_unique<AMQP::TcpChannel>(_connection.get());
_loop_thread = std::thread([this]() {
ev_run(_loop, 0);
});
}
~MQClient() {
ev_async_init(&_async_watcher, watcher_callback);
ev_async_start(_loop, &_async_watcher);
ev_async_send(_loop, &_async_watcher);
_loop_thread.join();
_loop = nullptr;
}
void declare***ponents(const std::string &exchange, const std::string &queue,
const std::string &routing_key = "routing_key",
AMQP::ExchangeType echange_type = AMQP::ExchangeType::direct) {
// 声明交换机
_channel->declareExchange(exchange, echange_type)
.onError([](const char *message) {
LOG_ERROR("声明交换机失败:{}", message);
exit(0);
})
.onSu***ess([exchange](){
LOG_ERROR("{} 交换机创建成功!", exchange);
});
// 声明队列
_channel->declareQueue(queue)
.onError([](const char *message) {
LOG_ERROR("声明队列失败:{}", message);
exit(0);
})
.onSu***ess([queue](){
LOG_ERROR("{} 队列创建成功!", queue);
});
// 针对交换机和队列进行绑定
_channel->bindQueue(exchange, queue, routing_key)
.onError([exchange, queue](const char *message) {
LOG_ERROR("{} - {} 绑定失败:", exchange, queue);
exit(0);
})
.onSu***ess([exchange, queue, routing_key](){
LOG_ERROR("{} - {} - {} 绑定成功!", exchange, queue, routing_key);
});
}
// routing_key 指定队列
bool publish(const std::string &exchange, const std::string &msg, const std::string &routing_key = "routing_key") {
LOG_DEBUG("向交换机 {}-{} 发布消息!", exchange, routing_key);
bool ret = _channel->publish(exchange, routing_key, msg);
if (ret == false) {
LOG_ERROR("{} 发布消息失败:", exchange);
return false;
}
return true;
}
void consume(const std::string &queue, const MessageCallback &cb) {
LOG_DEBUG("开始订阅 {} 队列消息!", queue);
_channel->consume(queue, "consume-tag") //返回值 DeferredConsumer
.onReceived([this, cb](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
cb(message.body(), message.bodySize());
// 消息确认
_channel->ack(deliveryTag);
})
.onError([queue](const char *message) {
LOG_ERROR("订阅 {} 队列消息失败: {}", queue, message);
exit(0);
});
}
private:
static void watcher_callback(struct ev_loop *loop, ev_async *watcher, int32_t revents) {
ev_break(loop, EVBREAK_ALL);
}
private:
struct ev_async _async_watcher;
struct ev_loop *_loop;
std::unique_ptr<AMQP::LibEvHandler> _handler;
std::unique_ptr<AMQP::TcpConnection> _connection;
std::unique_ptr<AMQP::TcpChannel> _channel;
std::thread _loop_thread;
};
🔥 共勉
😋 以上就是我对 【C++组件】RabbitMq 安装与使用 的理解, 觉得这篇博客对你有帮助的,可以点赞收藏关注支持一波~ 😉