目录
一、ET 与 LT 模式的深度对比
1. 水平触发 (LT, Level Triggered)
2. 边缘触发 (ET, Edge Triggered)
3. 性能对比
二、Reactor 模型概述
Reactor 模型的核心组件
单 Reactor 单进程模型架构
三、基于 ET 模式的 Reactor 实现
四、ET 模式 Reactor 的关键技术点
五、总结
在 Linux 网络编程中,epoll 的边缘触发 (ET) 和水平触发 (LT) 模式是实现高效 I/O 多路复用的关键技术。本文将首先对比这两种模式的核心差异,然后详细讲解基于 ET 模式的 Reactor 模型实现,帮助读者理解高性能网络服务器的设计原理。
一、ET 与 LT 模式的深度对比
epoll 作为 Linux 特有的 I/O 多路复用机制,支持两种事件触发模式,它们在事件通知方式上有着本质区别:
1. 水平触发 (LT, Level Triggered)
- 触发机制:只要文件描述符处于就绪状态(如可读 / 可写),epoll_wait 就会持续返回该事件
-
使用特点:
- 无需一次性处理完所有数据,可分多次读取
- 可使用阻塞或非阻塞 I/O
- 编程简单,不易出错
- 适用场景:连接数不多、数据处理逻辑复杂的场景
2. 边缘触发 (ET, Edge Triggered)
- 触发机制:仅在文件描述符状态从 "非就绪" 变为 "就绪" 时触发一次事件
-
使用特点:
- 必须一次性处理完所有可用数据
- 必须使用非阻塞 I/O
- 事件通知次数少,效率更高
- 编程复杂,需处理各种边界情况
- 适用场景:高并发、大流量的服务器程序
3. 性能对比
| 指标 | LT 模式 | ET 模式 |
|---|---|---|
| 事件通知频率 | 高 | 低 |
| 系统调用次数 | 多 | 少 |
| CPU 利用率 | 较高 | 较低 |
| 编程复杂度 | 低 | 高 |
| 数据处理要求 | 宽松 | 严格 |
二、Reactor 模型概述
Reactor 模式是一种基于事件驱动的设计模式,其核心思想是通过一个事件循环来管理所有 I/O 事件,当事件发生时将其分发给对应的处理器。
Reactor 模型的核心组件
- 事件多路分发器:使用 epoll 等机制监听 I/O 事件
- 事件处理器:定义事件处理接口,由具体业务实现
- 事件源:如套接字等产生 I/O 事件的实体
- 回调函数:事件发生时的处理逻辑
单 Reactor 单进程模型架构
plaintext
+-------------------+
| 事件循环 |<----+
| (Reactor核心) | |
+--------+----------+ |
| |
v |
+--------+----------+ |
| 事件多路分发器 | |
| (epoll) | |
+--------+----------+ |
| |
+--------+-------+
|
+--------+--------+
| |
+--------v-------+ +------v------+
| 新连接处理器 | | 读写事件处理器 |
+----------------+ +-------------+
三、基于 ET 模式的 Reactor 实现
下面实现一个基于 ET 模式的单 Reactor 单进程 TCP 服务器,完整代码如下:
基于ET模式的Reactor实现:
Connection.hpp类
#pragma once
#include <memory>
#include <string>
#include <sys/socket.h>
#include <***i***/in.h>
#include <functional>
#include <cstring>
#include <unistd.h>
#include <iostream>
#include "nocopy.hpp"
class Connection : public std::enable_shared_from_this<Connection>, public nocopy
{
public:
using Ptr = std::shared_ptr<Connection>;
using Callback = std::function<void(Connection::Ptr)>;
Connection(int sockfd, const struct sockaddr_in& addr)
: sockfd_(sockfd), addr_(addr)
{
// 客户端地址转换为字符串
char ip[I***_ADDRSTRLEN];
i***_ntop(AF_I***, &addr_.sin_addr, ip, sizeof(ip));
peerAddr_ = std::string(ip) + ":" + std::to_string(ntohs(addr_.sin_port));
}
~Connection()
{
close(sockfd_);
std::cout << "Connection closed: " << peerAddr_ << std::endl;
}
int fd() const { return sockfd_; }
const std::string& peerAddr() const { return peerAddr_; }
// 读取数据
ssize_t read()
{
char buf[1024];
ssize_t n;
while (true)
{
n = ::read(sockfd_, buf, sizeof(buf));
if (n > 0)
{
inBuffer_.append(buf, n);
}
// ET模式下,非阻塞读返回-1且errno为EAGAIN表示数据已读完
else if (n == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
{
return inBuffer_.size();
}
else // 错误或连接关闭
{
return -1;
}
}
}
// 发送数据
ssize_t write()
{
ssize_t n = ::write(sockfd_, outBuffer_.data(), outBuffer_.size());
if (n > 0)
{
outBuffer_.erase(0, n);
// 如果缓冲区还有数据,需要继续关注可写事件
if (outBuffer_.empty())
{
if (write***pleteCallback_)
{
write***pleteCallback_(shared_from_this());
}
return 0;
}
}
return n;
}
// 设置回调函数
void setReadCallback(Callback cb) { readCallback_ = cb; }
void setWriteCallback(Callback cb) { writeCallback_ = cb; }
void setCloseCallback(Callback cb) { closeCallback_ = cb; }
void setErrorCallback(Callback cb) { errorCallback_ = cb; }
void setWrite***pleteCallback(Callback cb) { write***pleteCallback_ = cb; }
// 触发回调
void handleRead() { if (readCallback_) readCallback_(shared_from_this()); }
void handleWrite() { if (writeCallback_) writeCallback_(shared_from_this()); }
void handleClose() { if (closeCallback_) closeCallback_(shared_from_this()); }
void handleError() { if (errorCallback_) errorCallback_(shared_from_this()); }
// 缓冲区操作
std::string& inBuffer() { return inBuffer_; }
std::string& outBuffer() { return outBuffer_; }
void append(const std::string& data) { outBuffer_.append(data); }
void clearInBuffer() { inBuffer_.clear(); }
void clearOutBuffer() { outBuffer_.clear(); }
private:
int sockfd_;
struct sockaddr_in addr_;
std::string peerAddr_;
std::string inBuffer_; // 输入缓冲区
std::string outBuffer_; // 输出缓冲区
Callback readCallback_;
Callback writeCallback_;
Callback closeCallback_;
Callback errorCallback_;
Callback write***pleteCallback_;
};
Epoll.hpp
#pragma once
#include <vector>
#include <sys/epoll.h>
#include <unistd.h>
#include <cstring>
#include <iostream>
#include "nocopy.hpp"
class Epoller : public nocopy
{
public:
Epoller() : epollfd_(epoll_create1(EPOLL_CLOEXEC))
{
if (epollfd_ < 0)
{
perror("epoll create error");
exit(1);
}
}
~Epoller()
{
close(epollfd_);
}
// 添加事件
bool addFd(int fd, uint32_t events)
{
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.data.fd = fd;
ev.events = events; // 支持ET模式: events | EPOLLET
return epoll_ctl(epollfd_, EPOLL_CTL_ADD, fd, &ev) == 0;
}
// 修改事件
bool modFd(int fd, uint32_t events)
{
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.data.fd = fd;
ev.events = events;
return epoll_ctl(epollfd_, EPOLL_CTL_MOD, fd, &ev) == 0;
}
// 删除事件
bool delFd(int fd)
{
return epoll_ctl(epollfd_, EPOLL_CTL_DEL, fd, nullptr) == 0;
}
// 等待事件
int wait(int timeoutMs = -1)
{
return epoll_wait(epollfd_, events_.data(), events_.size(), timeoutMs);
}
struct epoll_event& getEvent(size_t i)
{
return events_[i];
}
// 调整事件数组大小
void resizeEvents(size_t size)
{
if (events_.size() < size)
{
events_.resize(size);
}
}
private:
int epollfd_;
std::vector<struct epoll_event> events_;
};
Socket.hpp
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/i***.h>
#include <***i***/in.h>
#include <f***tl.h>
#include "nocopy.hpp"
enum
{
SocketErr = 2,
BindErr,
ListenErr
};
const int backlog = 1024;
class Socket : public nocopy
{
public:
Socket() : sockfd_(-1) {}
~Socket()
{
if (sockfd_ != -1)
{
close(sockfd_);
}
}
void create()
{
sockfd_ = socket(AF_I***, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (sockfd_ < 0)
{
perror("socket create error");
exit(SocketErr);
}
int opt = 1;
setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
}
void bind(uint16_t port)
{
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_I***;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
if (::bind(sockfd_, (struct sockaddr *)&local, sizeof(local)) < 0)
{
perror("bind error");
exit(BindErr);
}
}
void listen()
{
if (::listen(sockfd_, backlog) < 0)
{
perror("listen error");
exit(ListenErr);
}
}
int a***ept(struct sockaddr_in* client, socklen_t* len)
{
return ::a***ept4(sockfd_, (struct sockaddr*)client, len, SOCK_NONBLOCK);
}
int fd() const { return sockfd_; }
// 设置非阻塞
void setNonBlock()
{
int flag = f***tl(sockfd_, F_GETFL, 0);
f***tl(sockfd_, F_SETFL, flag | O_NONBLOCK);
}
private:
int sockfd_;
};
TcpServer.hpp
#pragma once
#include <unordered_map>
#include <memory>
#include <functional>
#include <iostream>
#include "Socket.hpp"
#include "Epoller.hpp"
#include "Connection.hpp"
class TcpServer : public nocopy
{
public:
using ConnectionCallback = std::function<void(Connection::Ptr)>;
using MessageCallback = std::function<void(Connection::Ptr, std::string&)>;
TcpServer(uint16_t port) : port_(port), epoller_(new Epoller),
listenSocket_(new Socket)
{
listenSocket_->create();
listenSocket_->bind(port);
listenSocket_->listen();
// 监听套接字也使用非阻塞模式
listenSocket_->setNonBlock();
// 注册监听套接字的读事件,使用ET模式
epoller_->addFd(listenSocket_->fd(), EPOLLIN | EPOLLET);
std::cout << "TcpServer started on port " << port << std::endl;
}
~TcpServer() = default;
// 启动事件循环
void start()
{
while (true)
{
// 调整事件数组大小
epoller_->resizeEvents(1024);
// 等待事件发生
int nready = epoller_->wait(1000); // 超时1秒
for (int i = 0; i < nready; ++i)
{
auto& ev = epoller_->getEvent(i);
int fd = ev.data.fd;
if (fd == listenSocket_->fd())
{
// 处理新连接
handleNewConnection();
}
else if (ev.events & (EPOLLIN | EPOLLPRI))
{
// 处理读事件
handleReadEvent(fd);
}
else if (ev.events & EPOLLOUT)
{
// 处理写事件
handleWriteEvent(fd);
}
else if (ev.events & (EPOLLERR | EPOLLHUP))
{
// 处理错误或断开事件
handleErrorEvent(fd);
}
}
}
}
// 设置回调函数
void setConnectionCallback(ConnectionCallback cb) { connectionCallback_ = cb; }
void setMessageCallback(MessageCallback cb) { messageCallback_ = cb; }
void setCloseCallback(ConnectionCallback cb) { closeCallback_ = cb; }
private:
// 处理新连接
void handleNewConnection()
{
struct sockaddr_in clientAddr;
socklen_t clientLen = sizeof(clientAddr);
while (true)
{
// 接受新连接,ET模式下需要一次性处理完所有待接受连接
int connfd = listenSocket_->a***ept(&clientAddr, &clientLen);
if (connfd < 0)
{
// 没有更多连接可接受
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
break;
}
perror("a***ept error");
return;
}
std::cout << "New connection from " << i***_ntoa(clientAddr.sin_addr)
<< ":" << ntohs(clientAddr.sin_port) << std::endl;
// 创建连接对象
auto conn = std::make_shared<Connection>(connfd, clientAddr);
// 设置连接的回调函数
conn->setReadCallback([this](Connection::Ptr c) { handleRead(c); });
conn->setWriteCallback([this](Connection::Ptr c) { handleWrite(c); });
conn->setCloseCallback([this](Connection::Ptr c) { handleClose(c); });
conn->setErrorCallback([this](Connection::Ptr c) { handleError(c); });
conn->setWrite***pleteCallback([this](Connection::Ptr c) {
// 数据发送完成后,取消写事件监听
epoller_->modFd(c->fd(), EPOLLIN | EPOLLET);
});
// 将连接加入管理
connections_[connfd] = conn;
// 注册读事件,使用ET模式
epoller_->addFd(connfd, EPOLLIN | EPOLLET);
// 触发连接建立回调
if (connectionCallback_)
{
connectionCallback_(conn);
}
}
}
// 处理读事件
void handleReadEvent(int fd)
{
auto it = connections_.find(fd);
if (it == connections_.end())
{
return;
}
auto conn = it->second;
conn->handleRead();
}
// 处理写事件
void handleWriteEvent(int fd)
{
auto it = connections_.find(fd);
if (it == connections_.end())
{
return;
}
auto conn = it->second;
conn->handleWrite();
}
// 处理错误事件
void handleErrorEvent(int fd)
{
auto it = connections_.find(fd);
if (it == connections_.end())
{
return;
}
auto conn = it->second;
conn->handleError();
handleClose(fd);
}
// 处理连接关闭
void handleClose(int fd)
{
auto it = connections_.find(fd);
if (it != connections_.end())
{
if (closeCallback_)
{
closeCallback_(it->second);
}
epoller_->delFd(fd);
connections_.erase(it);
}
}
// 实际的读处理
void handleRead(Connection::Ptr conn)
{
ssize_t n = conn->read();
if (n > 0)
{
// 触发消息处理回调
if (messageCallback_)
{
messageCallback_(conn, conn->inBuffer());
}
conn->clearInBuffer();
}
else if (n == 0 || (n < 0 && errno != EAGAIN))
{
// 连接关闭或错误
conn->handleClose();
}
}
// 实际的写处理
void handleWrite(Connection::Ptr conn)
{
ssize_t n = conn->write();
if (n < 0 && errno != EAGAIN)
{
conn->handleError();
conn->handleClose();
}
}
// 处理连接关闭
void handleClose(Connection::Ptr conn)
{
handleClose(conn->fd());
}
// 处理错误
void handleError(Connection::Ptr conn)
{
std::cerr << "Error on connection: " << conn->peerAddr() << std::endl;
}
private:
uint16_t port_;
std::unique_ptr<Epoller> epoller_;
std::unique_ptr<Socket> listenSocket_;
std::unordered_map<int, Connection::Ptr> connections_; // 管理所有连接
ConnectionCallback connectionCallback_; // 连接建立回调
MessageCallback messageCallback_; // 消息到达回调
ConnectionCallback closeCallback_; // 连接关闭回调
};
Main.***
#include "TcpServer.hpp"
#include <iostream>
#include <string>
int main()
{
// 创建服务器,监听8080端口
TcpServer server(8080);
// 设置连接建立回调
server.setConnectionCallback([](Connection::Ptr conn) {
std::cout << "Connection established: " << conn->peerAddr() << std::endl;
conn->append("Wel***e to ET Reactor Server!\n");
// 注册写事件,触发数据发送
// 在实际应用中,应通过epoller修改事件
});
// 设置消息处理回调
server.setMessageCallback([](Connection::Ptr conn, std::string& data) {
std::cout << "Received from " << conn->peerAddr() << ": " << data << std::endl;
// 简单回显
std::string response = "Server received: " + data;
conn->append(response);
// 注册写事件
// 在实际应用中,应通过epoller修改事件
});
// 设置连接关闭回调
server.setCloseCallback([](Connection::Ptr conn) {
std::cout << "Connection closed: " << conn->peerAddr() << std::endl;
});
// 启动服务器
server.start();
return 0;
}
nocopy.hpp
#pragma once
class nocopy
{
public:
nocopy() = default;
nocopy(const nocopy&) = delete;
nocopy& operator=(const nocopy&) = delete;
};
四、ET 模式 Reactor 的关键技术点
-
非阻塞 I/O 配合:ET 模式必须与非阻塞 I/O 结合使用,确保能一次性读取 / 写入所有可用数据
-
事件处理完整性:在 ET 模式下,每次事件触发都需要循环处理直到操作返回 EAGAIN/EWOULDBLOCK,告知程序 “当前没有可用数据” 或 “操作暂时无法完成”
-
连接管理:使用哈希表存储所有活跃连接,便于根据文件描述符快速查找对应的连接对象
-
缓冲区设计:每个连接维护独立的输入 / 输出缓冲区,解决 TCP 粘包和发送阻塞问题
-
边缘触发的新连接处理:对于监听套接字,需要在 ET 模式下一次性接受所有待处理的新连接
五、总结
ET 模式相比 LT 模式能显著减少事件通知次数,提高高并发场景下的性能,但也带来了更高的编程复杂度。基于 ET 模式的 Reactor 模型通过高效的事件驱动机制,能够构建出高性能的网络服务器。
在实际应用中,选择 ET 还是 LT 模式需要根据具体场景权衡:ET 模式适合高并发、低延迟的场景,而 LT 模式则适合逻辑简单、对编程复杂度敏感的场景。
本文实现的单 Reactor 单进程模型虽然简单,但展示了 ET 模式 Reactor 的核心原理,在此基础上可以进一步扩展为多 Reactor 多进程 / 线程模型,以充分利用多核 CPU 的性能。