深入理解 ET 与 LT 模式及其在 Reactor 模型中的应用

深入理解 ET 与 LT 模式及其在 Reactor 模型中的应用

目录

一、ET 与 LT 模式的深度对比

1. 水平触发 (LT, Level Triggered)

2. 边缘触发 (ET, Edge Triggered)

3. 性能对比

二、Reactor 模型概述

Reactor 模型的核心组件

单 Reactor 单进程模型架构

三、基于 ET 模式的 Reactor 实现

四、ET 模式 Reactor 的关键技术点

五、总结

在 Linux 网络编程中,epoll 的边缘触发 (ET) 和水平触发 (LT) 模式是实现高效 I/O 多路复用的关键技术。本文将首先对比这两种模式的核心差异,然后详细讲解基于 ET 模式的 Reactor 模型实现,帮助读者理解高性能网络服务器的设计原理。

一、ET 与 LT 模式的深度对比

epoll 作为 Linux 特有的 I/O 多路复用机制,支持两种事件触发模式,它们在事件通知方式上有着本质区别:

1. 水平触发 (LT, Level Triggered)

  • 触发机制:只要文件描述符处于就绪状态(如可读 / 可写),epoll_wait 就会持续返回该事件
  • 使用特点
    • 无需一次性处理完所有数据,可分多次读取
    • 可使用阻塞或非阻塞 I/O
    • 编程简单,不易出错
  • 适用场景连接数不多、数据处理逻辑复杂的场景

2. 边缘触发 (ET, Edge Triggered)

  • 触发机制:仅在文件描述符状态从 "非就绪" 变为 "就绪" 时触发一次事件
  • 使用特点
    • 必须一次性处理完所有可用数据
    • 必须使用非阻塞 I/O
    • 事件通知次数少,效率更高
    • 编程复杂,需处理各种边界情况
  • 适用场景:高并发、大流量的服务器程序

3. 性能对比

指标 LT 模式 ET 模式
事件通知频率
系统调用次数
CPU 利用率 较高 较低
编程复杂度
数据处理要求 宽松 严格

二、Reactor 模型概述

Reactor 模式是一种基于事件驱动的设计模式,其核心思想是通过一个事件循环来管理所有 I/O 事件,当事件发生时将其分发给对应的处理器。

Reactor 模型的核心组件

  1. 事件多路分发器:使用 epoll 等机制监听 I/O 事件
  2. 事件处理器:定义事件处理接口,由具体业务实现
  3. 事件源:如套接字等产生 I/O 事件的实体
  4. 回调函数:事件发生时的处理逻辑

单 Reactor 单进程模型架构

plaintext

+-------------------+
|     事件循环      |<----+
|  (Reactor核心)    |     |
+--------+----------+     |
         |                |
         v                |
+--------+----------+     |
|  事件多路分发器   |     |
|    (epoll)        |     |
+--------+----------+     |
         |                |
         +--------+-------+
                  |
         +--------+--------+
         |                 |
+--------v-------+  +------v------+
| 新连接处理器    |  | 读写事件处理器 |
+----------------+  +-------------+

三、基于 ET 模式的 Reactor 实现

下面实现一个基于 ET 模式的单 Reactor 单进程 TCP 服务器,完整代码如下:

基于ET模式的Reactor实现:

Connection.hpp类

#pragma once

#include <memory>
#include <string>
#include <sys/socket.h>
#include <***i***/in.h>
#include <functional>
#include <cstring>
#include <unistd.h>
#include <iostream>
#include "nocopy.hpp"

class Connection : public std::enable_shared_from_this<Connection>, public nocopy
{
public:
    using Ptr = std::shared_ptr<Connection>;
    using Callback = std::function<void(Connection::Ptr)>;

    Connection(int sockfd, const struct sockaddr_in& addr)
        : sockfd_(sockfd), addr_(addr)
    {
        // 客户端地址转换为字符串
        char ip[I***_ADDRSTRLEN];
        i***_ntop(AF_I***, &addr_.sin_addr, ip, sizeof(ip));
        peerAddr_ = std::string(ip) + ":" + std::to_string(ntohs(addr_.sin_port));
    }

    ~Connection()
    {
        close(sockfd_);
        std::cout << "Connection closed: " << peerAddr_ << std::endl;
    }

    int fd() const { return sockfd_; }
    const std::string& peerAddr() const { return peerAddr_; }

    // 读取数据
    ssize_t read()
    {
        char buf[1024];
        ssize_t n;
        while (true)
        {
            n = ::read(sockfd_, buf, sizeof(buf));
            if (n > 0)
            {
                inBuffer_.append(buf, n);
            }
            // ET模式下,非阻塞读返回-1且errno为EAGAIN表示数据已读完
            else if (n == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
            {
                return inBuffer_.size();
            }
            else // 错误或连接关闭
            {
                return -1;
            }
        }
    }

    // 发送数据
    ssize_t write()
    {
        ssize_t n = ::write(sockfd_, outBuffer_.data(), outBuffer_.size());
        if (n > 0)
        {
            outBuffer_.erase(0, n);
            // 如果缓冲区还有数据,需要继续关注可写事件
            if (outBuffer_.empty())
            {
                if (write***pleteCallback_)
                {
                    write***pleteCallback_(shared_from_this());
                }
                return 0;
            }
        }
        return n;
    }

    // 设置回调函数
    void setReadCallback(Callback cb) { readCallback_ = cb; }
    void setWriteCallback(Callback cb) { writeCallback_ = cb; }
    void setCloseCallback(Callback cb) { closeCallback_ = cb; }
    void setErrorCallback(Callback cb) { errorCallback_ = cb; }
    void setWrite***pleteCallback(Callback cb) { write***pleteCallback_ = cb; }

    // 触发回调
    void handleRead() { if (readCallback_) readCallback_(shared_from_this()); }
    void handleWrite() { if (writeCallback_) writeCallback_(shared_from_this()); }
    void handleClose() { if (closeCallback_) closeCallback_(shared_from_this()); }
    void handleError() { if (errorCallback_) errorCallback_(shared_from_this()); }

    // 缓冲区操作
    std::string& inBuffer() { return inBuffer_; }
    std::string& outBuffer() { return outBuffer_; }
    
    void append(const std::string& data) { outBuffer_.append(data); }
    void clearInBuffer() { inBuffer_.clear(); }
    void clearOutBuffer() { outBuffer_.clear(); }

private:
    int sockfd_;
    struct sockaddr_in addr_;
    std::string peerAddr_;
    std::string inBuffer_;  // 输入缓冲区
    std::string outBuffer_; // 输出缓冲区

    Callback readCallback_;
    Callback writeCallback_;
    Callback closeCallback_;
    Callback errorCallback_;
    Callback write***pleteCallback_;
};

Epoll.hpp

#pragma once

#include <vector>
#include <sys/epoll.h>
#include <unistd.h>
#include <cstring>
#include <iostream>
#include "nocopy.hpp"

class Epoller : public nocopy
{
public:
    Epoller() : epollfd_(epoll_create1(EPOLL_CLOEXEC))
    {
        if (epollfd_ < 0)
        {
            perror("epoll create error");
            exit(1);
        }
    }

    ~Epoller()
    {
        close(epollfd_);
    }

    // 添加事件
    bool addFd(int fd, uint32_t events)
    {
        struct epoll_event ev;
        memset(&ev, 0, sizeof(ev));
        ev.data.fd = fd;
        ev.events = events;  // 支持ET模式: events | EPOLLET
        
        return epoll_ctl(epollfd_, EPOLL_CTL_ADD, fd, &ev) == 0;
    }

    // 修改事件
    bool modFd(int fd, uint32_t events)
    {
        struct epoll_event ev;
        memset(&ev, 0, sizeof(ev));
        ev.data.fd = fd;
        ev.events = events;
        
        return epoll_ctl(epollfd_, EPOLL_CTL_MOD, fd, &ev) == 0;
    }

    // 删除事件
    bool delFd(int fd)
    {
        return epoll_ctl(epollfd_, EPOLL_CTL_DEL, fd, nullptr) == 0;
    }

    // 等待事件
    int wait(int timeoutMs = -1)
    {
        return epoll_wait(epollfd_, events_.data(), events_.size(), timeoutMs);
    }

    struct epoll_event& getEvent(size_t i)
    {
        return events_[i];
    }

    // 调整事件数组大小
    void resizeEvents(size_t size)
    {
        if (events_.size() < size)
        {
            events_.resize(size);
        }
    }

private:
    int epollfd_;
    std::vector<struct epoll_event> events_;
};

Socket.hpp

#pragma once

#include <iostream>
#include <string>
#include <unistd.h>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/i***.h>
#include <***i***/in.h>
#include <f***tl.h>
#include "nocopy.hpp"

enum
{
    SocketErr = 2,
    BindErr,
    ListenErr
};

const int backlog = 1024;

class Socket : public nocopy
{
public:
    Socket() : sockfd_(-1) {}
    ~Socket() 
    {
        if (sockfd_ != -1)
        {
            close(sockfd_);
        }
    }

    void create()
    {
        sockfd_ = socket(AF_I***, SOCK_STREAM | SOCK_NONBLOCK, 0);
        if (sockfd_ < 0)
        {
            perror("socket create error");
            exit(SocketErr);
        }
        
        int opt = 1;
        setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
    }

    void bind(uint16_t port)
    {
        struct sockaddr_in local;
        memset(&local, 0, sizeof(local));
        local.sin_family = AF_I***;
        local.sin_port = htons(port);
        local.sin_addr.s_addr = INADDR_ANY;

        if (::bind(sockfd_, (struct sockaddr *)&local, sizeof(local)) < 0)
        {
            perror("bind error");
            exit(BindErr);
        }
    }

    void listen()
    {
        if (::listen(sockfd_, backlog) < 0)
        {
            perror("listen error");
            exit(ListenErr);
        }
    }

    int a***ept(struct sockaddr_in* client, socklen_t* len)
    {
        return ::a***ept4(sockfd_, (struct sockaddr*)client, len, SOCK_NONBLOCK);
    }

    int fd() const { return sockfd_; }

    // 设置非阻塞
    void setNonBlock()
    {
        int flag = f***tl(sockfd_, F_GETFL, 0);
        f***tl(sockfd_, F_SETFL, flag | O_NONBLOCK);
    }

private:
    int sockfd_;
};

TcpServer.hpp

#pragma once

#include <unordered_map>
#include <memory>
#include <functional>
#include <iostream>
#include "Socket.hpp"
#include "Epoller.hpp"
#include "Connection.hpp"

class TcpServer : public nocopy
{
public:
    using ConnectionCallback = std::function<void(Connection::Ptr)>;
    using MessageCallback = std::function<void(Connection::Ptr, std::string&)>;

    TcpServer(uint16_t port) : port_(port), epoller_(new Epoller), 
                              listenSocket_(new Socket)
    {
        listenSocket_->create();
        listenSocket_->bind(port);
        listenSocket_->listen();
        // 监听套接字也使用非阻塞模式
        listenSocket_->setNonBlock();
        
        // 注册监听套接字的读事件,使用ET模式
        epoller_->addFd(listenSocket_->fd(), EPOLLIN | EPOLLET);
        
        std::cout << "TcpServer started on port " << port << std::endl;
    }

    ~TcpServer() = default;

    // 启动事件循环
    void start()
    {
        while (true)
        {
            // 调整事件数组大小
            epoller_->resizeEvents(1024);
            // 等待事件发生
            int nready = epoller_->wait(1000); // 超时1秒
            
            for (int i = 0; i < nready; ++i)
            {
                auto& ev = epoller_->getEvent(i);
                int fd = ev.data.fd;
                
                if (fd == listenSocket_->fd())
                {
                    // 处理新连接
                    handleNewConnection();
                }
                else if (ev.events & (EPOLLIN | EPOLLPRI))
                {
                    // 处理读事件
                    handleReadEvent(fd);
                }
                else if (ev.events & EPOLLOUT)
                {
                    // 处理写事件
                    handleWriteEvent(fd);
                }
                else if (ev.events & (EPOLLERR | EPOLLHUP))
                {
                    // 处理错误或断开事件
                    handleErrorEvent(fd);
                }
            }
        }
    }

    // 设置回调函数
    void setConnectionCallback(ConnectionCallback cb) { connectionCallback_ = cb; }
    void setMessageCallback(MessageCallback cb) { messageCallback_ = cb; }
    void setCloseCallback(ConnectionCallback cb) { closeCallback_ = cb; }

private:
    // 处理新连接
    void handleNewConnection()
    {
        struct sockaddr_in clientAddr;
        socklen_t clientLen = sizeof(clientAddr);
        
        while (true)
        {
            // 接受新连接,ET模式下需要一次性处理完所有待接受连接
            int connfd = listenSocket_->a***ept(&clientAddr, &clientLen);
            if (connfd < 0)
            {
                // 没有更多连接可接受
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                {
                    break;
                }
                perror("a***ept error");
                return;
            }

            std::cout << "New connection from " << i***_ntoa(clientAddr.sin_addr) 
                      << ":" << ntohs(clientAddr.sin_port) << std::endl;

            // 创建连接对象
            auto conn = std::make_shared<Connection>(connfd, clientAddr);
            
            // 设置连接的回调函数
            conn->setReadCallback([this](Connection::Ptr c) { handleRead(c); });
            conn->setWriteCallback([this](Connection::Ptr c) { handleWrite(c); });
            conn->setCloseCallback([this](Connection::Ptr c) { handleClose(c); });
            conn->setErrorCallback([this](Connection::Ptr c) { handleError(c); });
            conn->setWrite***pleteCallback([this](Connection::Ptr c) {
                // 数据发送完成后,取消写事件监听
                epoller_->modFd(c->fd(), EPOLLIN | EPOLLET);
            });

            // 将连接加入管理
            connections_[connfd] = conn;
            
            // 注册读事件,使用ET模式
            epoller_->addFd(connfd, EPOLLIN | EPOLLET);
            
            // 触发连接建立回调
            if (connectionCallback_)
            {
                connectionCallback_(conn);
            }
        }
    }

    // 处理读事件
    void handleReadEvent(int fd)
    {
        auto it = connections_.find(fd);
        if (it == connections_.end())
        {
            return;
        }

        auto conn = it->second;
        conn->handleRead();
    }

    // 处理写事件
    void handleWriteEvent(int fd)
    {
        auto it = connections_.find(fd);
        if (it == connections_.end())
        {
            return;
        }

        auto conn = it->second;
        conn->handleWrite();
    }

    // 处理错误事件
    void handleErrorEvent(int fd)
    {
        auto it = connections_.find(fd);
        if (it == connections_.end())
        {
            return;
        }

        auto conn = it->second;
        conn->handleError();
        handleClose(fd);
    }

    // 处理连接关闭
    void handleClose(int fd)
    {
        auto it = connections_.find(fd);
        if (it != connections_.end())
        {
            if (closeCallback_)
            {
                closeCallback_(it->second);
            }
            epoller_->delFd(fd);
            connections_.erase(it);
        }
    }

    // 实际的读处理
    void handleRead(Connection::Ptr conn)
    {
        ssize_t n = conn->read();
        if (n > 0)
        {
            // 触发消息处理回调
            if (messageCallback_)
            {
                messageCallback_(conn, conn->inBuffer());
            }
            conn->clearInBuffer();
        }
        else if (n == 0 || (n < 0 && errno != EAGAIN))
        {
            // 连接关闭或错误
            conn->handleClose();
        }
    }

    // 实际的写处理
    void handleWrite(Connection::Ptr conn)
    {
        ssize_t n = conn->write();
        if (n < 0 && errno != EAGAIN)
        {
            conn->handleError();
            conn->handleClose();
        }
    }

    // 处理连接关闭
    void handleClose(Connection::Ptr conn)
    {
        handleClose(conn->fd());
    }

    // 处理错误
    void handleError(Connection::Ptr conn)
    {
        std::cerr << "Error on connection: " << conn->peerAddr() << std::endl;
    }

private:
    uint16_t port_;
    std::unique_ptr<Epoller> epoller_;
    std::unique_ptr<Socket> listenSocket_;
    std::unordered_map<int, Connection::Ptr> connections_; // 管理所有连接

    ConnectionCallback connectionCallback_; // 连接建立回调
    MessageCallback messageCallback_;       // 消息到达回调
    ConnectionCallback closeCallback_;      // 连接关闭回调
};

Main.***

#include "TcpServer.hpp"
#include <iostream>
#include <string>

int main()
{
    // 创建服务器,监听8080端口
    TcpServer server(8080);
    
    // 设置连接建立回调
    server.setConnectionCallback([](Connection::Ptr conn) {
        std::cout << "Connection established: " << conn->peerAddr() << std::endl;
        conn->append("Wel***e to ET Reactor Server!\n");
        // 注册写事件,触发数据发送
        // 在实际应用中,应通过epoller修改事件
    });
    
    // 设置消息处理回调
    server.setMessageCallback([](Connection::Ptr conn, std::string& data) {
        std::cout << "Received from " << conn->peerAddr() << ": " << data << std::endl;
        // 简单回显
        std::string response = "Server received: " + data;
        conn->append(response);
        // 注册写事件
        // 在实际应用中,应通过epoller修改事件
    });
    
    // 设置连接关闭回调
    server.setCloseCallback([](Connection::Ptr conn) {
        std::cout << "Connection closed: " << conn->peerAddr() << std::endl;
    });
    
    // 启动服务器
    server.start();
    
    return 0;
}

nocopy.hpp

#pragma once  

class nocopy  
{  
public:  
    nocopy() = default;   
    nocopy(const nocopy&) = delete;   
    nocopy& operator=(const nocopy&) = delete;   
};

四、ET 模式 Reactor 的关键技术点

  1. 非阻塞 I/O 配合:ET 模式必须与非阻塞 I/O 结合使用,确保能一次性读取 / 写入所有可用数据

  2. 事件处理完整性在 ET 模式下,每次事件触发都需要循环处理直到操作返回 EAGAIN/EWOULDBLOCK,告知程序 “当前没有可用数据” 或 “操作暂时无法完成”

  3. 连接管理:使用哈希表存储所有活跃连接,便于根据文件描述符快速查找对应的连接对象

  4. 缓冲区设计:每个连接维护独立的输入 / 输出缓冲区,解决 TCP 粘包和发送阻塞问题

  5. 边缘触发的新连接处理对于监听套接字,需要在 ET 模式下一次性接受所有待处理的新连接

五、总结

ET 模式相比 LT 模式能显著减少事件通知次数,提高高并发场景下的性能,但也带来了更高的编程复杂度。基于 ET 模式的 Reactor 模型通过高效的事件驱动机制,能够构建出高性能的网络服务器。

在实际应用中,选择 ET 还是 LT 模式需要根据具体场景权衡:ET 模式适合高并发、低延迟的场景,而 LT 模式则适合逻辑简单、对编程复杂度敏感的场景。

本文实现的单 Reactor 单进程模型虽然简单,但展示了 ET 模式 Reactor 的核心原理,在此基础上可以进一步扩展为多 Reactor 多进程 / 线程模型,以充分利用多核 CPU 的性能。

转载请说明出处内容投诉
CSS教程网 » 深入理解 ET 与 LT 模式及其在 Reactor 模型中的应用

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买