【IO多路转接】高并发服务器实战:Reactor 框架与 Epoll 机制的封装与设计逻辑



半桔:个人主页

 🔥 个人专栏: 《IO多路转接》《手撕面试算法》《C++从入门到入土》

🔖无论什么样的灾难降临,只要生命还在,生活始终要继续。活着,就是最美丽的事。 《美丽人生》

前言

在高并发成为系统标配的今天,网络编程、中间件开发、分布式通信等场景中,“如何高效处理海量 IO 请求” 始终是开发者绕不开的核心命题。传统 “一连接一线程” 的同步阻塞模型,早已因线程资源耗尽、CPU 上下文切换频繁、内存占用过高等问题,难以应对万级甚至十万级的并发连接;即便引入线程池优化,也无法从根本上解决 “等待 IO 时线程闲置” 的资源浪费困境。​

正是在这样的需求下,基于 “事件驱动” 与 “IO 多路转接” 的 Reactor 模式应运而生 —— 它以 “少量线程监听多 IO、事件触发业务处理” 的核心逻辑,成为解决高并发 IO 的经典架构:小到 ***ty 的网络通信内核、Redis 的事件循环,大到 Nginx 的请求处理框架、Kafka 的消息接收模块,其底层都能看到 Reactor 模式的影子。可以说,理解 Reactor 模式的实现逻辑,是掌握高并发系统设计的 “关键钥匙”。​

本文正是围绕 “Reactor 模式实现” 展开:不局限于抽象原理,而是从底层技术依赖(IO 多路转接调用)切入,一步步拆解事件循环的构建、组件间的协作逻辑,手把手帮助你构建出一个基于Reactor 模式的服务器。

一. Epoll的工作模式

Epoll有两种工作模式:水平触发(Level Triggered,简称 LT)边缘触发(Edge Triggered,简称 ET)。这两种模式的核心差异在于 “何时通知应用程序某个文件描述符(fd)就绪”,直接影响高并发 IO 处理的效率和编程复杂度。

  1. 水平触发(LT):默认模式,“状态持续” 触发:

当一个文件描述符(如 socket)处于就绪状态(例如:有数据可读、可写,或发生异常)时,epoll 会持续通知应用程序,直到该就绪状态被 “消除”(例如:数据被完全读取、缓冲区被写满)。

  1. 边缘触发(ET):“状态变化” 触发,高效但复杂:

epoll 仅在文件描述符的就绪状态发生 “变化瞬间” 通知一次,之后无论该状态是否持续,都不再通知。,即只有在读写资源从没就绪多就绪的时候才会进行通知。

我们通常会认为ET模式的效率更高:

  • ET当资源就绪的时候只会通知一次,并不需要反复通知。并且如果上层没有将数据读取完毕,也不会再进行通知了;
  • 因为ET模式只会进行通知一次,因此其==会倒逼着上层在进行读取时要将数据一次全部取完,这样就可以空出一个更大的接收缓冲区,对方也可以发送更多的。

二. Reactor 服务器

一下我们开始进行基于 Reactor 模式设计的高性能网络服务器,通过 “事件驱动” 和 “IO 多路转接” 技术,高效处理海量并发连接。

2.1 对网络套接字进行封装

关于网络套接字可以查看,我之前写的关于TCP的文章,改内容并不是本文的重点,所以此处直接贴实现代码了:

const std::string defaultip_ = "0.0.0.0";
enum SockErr
{
    SOCKET_Err, 
    BIND_Err,
};

class Sock
{
public:
    Sock(uint16_t port)
        : port_(port),
          listensockfd_(-1)
    {
    }
    void Socket()
    {
        listensockfd_ = socket(AF_I***, SOCK_STREAM, 0);
        if (listensockfd_ < 0)
        {
            Log(Fatal) << "socket fail";
            exit(SOCKET_Err);
        }
        Log(Info) << "socket sucess";
    }
    void Bind()
    {
        struct sockaddr_in server;
        server.sin_family = AF_I***;
        server.sin_port = htons(port_);
        i***_pton(AF_I***, defaultip_.c_str(), &server.sin_addr);
        if (bind(listensockfd_, (struct sockaddr *)&server, sizeof(server)) < 0)
        {
            Log(Fatal) << "bind fail";
            exit(BIND_Err);
        }
        Log(Info) << "bind sucess";
    }
    void Listen()
    {
        if (listen(listensockfd_, 10) < 0)
        {
            Log(Warning) << "listen fail";
        }
        Log(Info) << "listen sucess";
    }
    int A***ept()
    {
        struct sockaddr_in client;
        socklen_t len = sizeof(client);
        int fd = a***ept(listensockfd_ , (sockaddr*)&client , &len);
        return fd;
    }
    int A***ept(std::string& ip , uint16_t& port)
    {
        struct sockaddr_in client;
        socklen_t len = sizeof(client);
        int fd = a***ept(listensockfd_ , (sockaddr*)&client , &len);

        port = ntohs(client.sin_port);
        char bufferip[64];
        i***_ntop(AF_I*** , &client.sin_addr , bufferip , sizeof(bufferip) - 1);
        ip = bufferip;

        return fd;
    }
    int Get_fd()
    {
        return listensockfd_;
    }
    ~Sock()
    {
        close(listensockfd_);
    }

private:
    uint16_t port_;
    int listensockfd_;
};

2.2 对Epoll接口进行封装

关于Epoll具体的细节,可以查看之前关于关于Epoll的文章,此处我们直接对封装的接口进行使用:

enum EpollErr
{
    CREAR_Err,
};

class Epoll
{
public:
    Epoll()
    {
        // 创建epoll模型
        _epfd = epoll_create(1);
        if (_epfd < 0)
        {
            Log(Fatal) << "epoll_create fail";
            exit(CREAR_Err);
        }
        Log(Info) << "epoll create sucess ";
    }

    void Add_fd(int fd, uint32_t event)
    {
        // 添加文件描述符到红黑树中
        struct epoll_event epevt;
        epevt.events = event;
        epevt.data.fd = fd;

        if (epoll_ctl(_epfd, EPOLL_CTL_ADD , fd, &epevt) < 0)
        {
            Log(Warning) << "epoll add error : " << strerror(errno);
        }
        Log(Info) << "epoll add sucess , fd : " << fd ;
    }

    void Del_fd(int fd)
    {
        // 删除要进行等待的文件描述符
        if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr) < 0)
        {
            Log(Warning) << "epoll del error : " << strerror(errno);
        }
        Log(Info) << "epoll del sucess  , fd : " << fd;
    }

    void Mod_fd(int fd, uint32_t event)
    {
        // 对文件描述符的事件进行修改
        struct epoll_event epevt;
        epevt.events = event;
        epevt.data.fd = fd;
        if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &epevt) < 0)
        {
            Log(Warning) << "epoll mod error : " << strerror(errno);
        }
    }

    int Wait(struct epoll_event *ep_array, int max_size, int timeout)
    {
        // 进行等待
        return epoll_wait(_epfd, ep_array, max_size, timeout);
    }

private:
    int _epfd;
};

2.3 设计一个管理连接的类

因为TCP通信传递的是字节流,因此我们无法确定每次获取到的数据是一个有效报文,因此我们需要将所有获取到的数据都先存储起来:

  1. 我们需要一个整形,存储连接对应的文件描述符;
  2. 需要两个缓冲区:输入缓冲区和输出缓冲区;
  3. 当然为将代码的耦合性尽量降低一些,此处我们将不同文件描述符处理读写以及异常事件的方法也放到Connection类中
    这些方法的参数统一都设置为:std::shared_ptr<Connection>来保证当跳转到外界去进行代码的执行时,依旧可以拿到文件描述符的相关资源。
class Connection;
using func_t = std::function<void(std::shared_ptr<Connection>)>;

class Connection
{
public:
    Connection(int fd , func_t recv , func_t sender , func_t exception)
    :_fd(fd) , 
    _Recv(recv) , _Sender(sender) , _Exception(exception)
    {
    }
private:
    int _fd;                 // 对应的文件描述符
    std::string _inbuffer ;  // 输入缓冲区
    std::string _outbuffer;  // 输出缓冲区
public:
    func_t _Recv;             // 处理接收的逻辑
    func_t _Sender;           // 处理发送的逻辑
    func_t _Exception;        // 处理出现异常时的逻辑
};

在该类中,毫无疑问我们在后续需要先缓冲区中进行读写操作:

    std::string& Get_Inbuffer()
    {
        return _inbuffer;
    }
    std::string& Get_Outbuffer()
    {
        return _outbuffer;
    }

    void Add_In(const std::string& mes)
    {
        _inbuffer += mes;
    }
    void Add_Out(const std::string& mes)
    {
        _outbuffer += mes;
    }
    int Get_fd()
    {
        return _fd;
    }

可能后续还需要使用一些操作,在后面再进行补充。

2.4 设计 Reactor服务器 类

  1. 需要一个Sock对象来从网路中获取客户端的连接;
  2. 需要一个Epoll对象来使用epoll多路转接的接口;
  3. 使用一个哈希表来存储每一个文件描述符与之对应的Connection资源,方便我们后面获取一个文件描述符的输入缓冲区和输出缓冲区;
  4. 还需要一个缓冲区,负责接收epoll模型等待结束后返回的就绪队列中的文件描述符信息。
class Rserver
{
	static const int array_num_max = 1024;
public:
    Rserver(uint16_t port)
    :_sock_ptr(new Sock(port)) , 
    _epoll_ptr(new Epoll)
    {}

private:
    std::shared_ptr<Sock> _sock_ptr;
    std::shared_ptr<Epoll> _epoll_ptr;
    std::unordered_map<int , std::shared_ptr<Connection> > _connections;
	struct epoll_event _epl_array[array_num_max];
};

2.5 将文件描述符设置为非阻塞

在ET模式下,我们要保证一次将所有的资源都获取上来,因此我们需要while式的对资源进行读取,这就使得如果没有资源了我们也不能让其堵塞住,因此要将所有文件描述符设置为非阻塞状态

此时使用int f***tl(int fd, int op , ... )接口进行设置:

int SetNoBlock(int fd)
{
    int fl = f***tl(fd, F_GETFL);
    fl |= O_NONBLOCK;
    int n = f***tl(fd, F_SETFL, fl); 
    return n;
}

2.6 所有文件描述符的处理方法

2.6.1 普通文件描述符的处理方法

首先就是普通文件的接收方法:

  1. 将缓冲区中的数据全部读取到connection中;
  2. 调用外界函数判断是否含有一个完成的报文;
  3. 含有完整报文就进行处理。

对于第二步,我们可以先外界开放一个接口,让外界将数据进行处理,将处理好的数据再给我,由服务器进行发送,因此我们在服务端的类中添加一个成员,负责回调

using callback_func = std::function<std::string(std::shared_ptr<Connection>)>;

class Rserver
{
    static const int array_num_max = 1024;
public:
    Rserver(uint16_t port , callback_func Onmessage)
    :_sock_ptr(new Sock(port)) , 
    _epoll_ptr(new Epoll) , 
    _Onmessage(Onmessage)
    {}

private:
    std::shared_ptr<Sock> _sock_ptr;
    std::shared_ptr<Epoll> _epoll_ptr;
    std::unordered_map<int , std::shared_ptr<Connection> > _connections;
    struct epoll_event _epl_array[array_num_max];

    callback_func _Onmessage;           // 负责回调
};

关于普通文件描述符的接受问题,需要注意的就是read的不同返回值进行不同的处理:

    void Recv(std::shared_ptr<Connection> con_ptr)
    {
        // 1. 将缓冲区中的数据全部读取到Connection中
        // 2. 调用外界函数判断是否含有一个完成的报文
        // 3. 先客户端返回结果

        char inbuffer[1024];
        while(1)
        {
            int n = read(con_ptr->Get_fd() , inbuffer , sizeof(inbuffer) - 1);
            if(n > 0)
            {
                // 有数据
                inbuffer[n] = 0;
                con_ptr->Add_In(inbuffer);
            }
            else if(n == 0)
            {
                // 对方关闭了文件 , 断开连接了 
                    
                // 1. 将文件描述符从epoll模型中移除
                // 2. 将文件描述符从哈希表中移除
                // 3. 将文件描述符关闭

                int fd = con_ptr->Get_fd();
                _epoll_ptr->Del_fd(fd);
                _connections.erase(fd);
                close(fd);
                return;
            }
            else
            {
                // 此次有两种情况: 1. 数据读取完了   2. 读取出错了
                if(errno == EAGAIN)  // 读取完了
                {
                    break;     
                } 
                else                 // 出错了
                {
                    // 此处调用文件对应的异常处理
                    con_ptr->_Exception(con_ptr);
                    return;
                }
            }
        }

        std::string ret = _Onmessage(con_ptr);
        con_ptr->Add_Out(ret);
    }

接下来就是编写发送的接口:

思考:对于发送接口是否需要判断,写事件是否就绪???

在大多数时候,写事件都是就绪的;因此如果将其加入到判断中epoll_wait就会频繁的进行返回,会影响效率;所以一般不对写事件加入到等待中,除非写缓冲区满了,此时才将写加入到等待中

  • 在代码中表现为:在调用write接口的时候,实际写入的大小比我字符串要小。
 void Sender(std::shared_ptr<Connection> con_ptr)
    {
        // 进行数据的发送
        // 直接进行发送

        std::string& outbuffer = con_ptr->Get_Outbuffer();
        int fd = con_ptr->Get_fd();
        // 循环式的进行发送
        while(1)
        {
            int n = write(fd , outbuffer.c_str() , outbuffer.size());
            if(n > 0)
            {
                // 1. 将已经发送的数据从字符串中移除
                outbuffer.erase(0 , n);
                if(outbuffer.empty()) break;           // 已经写完了
            }
            else if(n == 0)
            {
                break;
            }
            else
            {
                if(errno == EAGAIN)        // 已经写完了
                    break;
                else                       // 出错了
                {
                    // 此处调用文件对应的异常处理
                    con_ptr->_Exception(con_ptr);
                    return;
                }
            }
        }

        // 判断发送缓冲区中是否还有数据
        if(!outbuffer.empty())
        {
            // 发送缓冲区满了
            _epoll_ptr->Mod_fd(fd , EPOLLIN | EPOLLOUT | EPOLLET);
        }
        else
        {
            // 缓冲区没满 , 不需要对写事件进行检测
            _epoll_ptr->Mod_fd(fd , EPOLLIN | EPOLLET);
        }
    }

最后一步就是对异常情况的处理了:

  1. 打印日志信息;
  2. 将文件描述符从epoll模型从移除;
  3. 将文件描述符从哈希表中移除;
  4. 关闭文件描述符。
void Exception(std::shared_ptr<Connection> con_ptr)
    {
        int fd = con_ptr->Get_fd();
        _epoll_ptr->Del_fd(fd);
        _connections.erase(fd);
        close(fd);
    }

2.6.2 套接字的处理方法

对于套接字来说,只需要负责将建立好的链接拿上来就行了,不需要进行写入和异常处理。

在创建为新的文件描述符创建Connection对象的是时候,我们需要传入可执行对象,但是我们在进行统一接口的时候参数都是std::shared_ptr<Connection>,并且上述的Recv,Sender,Expection都是类成员函数,都有一个隐含的参数this指针,所以对于可调用对象在进行传参的是否要使用bind进行绑定

    void A***ept(std::shared_ptr<Connection> con_ptr)
    {
        // 1. 获取文件描述符
        while (1)
        {
            int newfd = _sock_ptr->A***ept();
            if (newfd >= 0)
            {
                // 有新连接
                // 2. 将文件描述符设置为非阻塞
                // 3. 将文件加入到epoll模型中
                // 4. 将文件描述符加入到哈希表中
                if(SetNoBlock(newfd) < 0)
                {
                    Log(Warning) << "set no block fail";
                    continue;
                }
                _epoll_ptr->Add_fd(newfd , EPOLLIN | EPOLLET);
                std::shared_ptr<Connection> con_ptr(new Connection(newfd ,
                std::bind(&Rserver::Recv , this , std::placeholders::_1), 
                std::bind(&Rserver::Sender , this , std::placeholders::_1), 
                std::bind(&Rserver::Exception , this , std::placeholders::_1)
                ));

                _connections.emplace(newfd , con_ptr);
            }
            else
            {
                if(errno == EAGAIN) break;
                else
                {
                    // 出错了
                    Log(Warning) << "a***ept fail";
                }
            }
        }
    }

2.7 初始化服务器

  1. 创建套接字;
  2. 进行绑定;
  3. 设置监听模式;
  4. 将网络套接字加入到epoll模型中,并创建connection加入到_connections中进行管理;
  5. 在创建Connection对象的时候,我们还需要设计一个套接字的Recv方法.

关于建立好的

    void Init()
    {
        // 1. 创建套接字
        // 2. 进行绑定
        // 3. 设置监听模式
        // 4. 将网络套接字加入到epoll模型中,并创建Connection加入到_connections中进行管理

        _sock_ptr->Socket();
        _sock_ptr->Bind();
        _sock_ptr->Listen();

        int listensock = _sock_ptr->Get_fd();
        SetNoBlock(listensock);
        _epoll_ptr->Add_fd(listensock , EPOLLIN | EPOLLET);
        std::shared_ptr<Connection> conptr(new Connection(listensock, 
            std::bind(&Rserver::A***ept , this , std::placeholders::_1), 
            nullptr, nullptr));
            
        _connections.emplace(listensock, conptr);
        
        // 将IP和端口号设置为可复用的
        int opt = 1;
        setsockopt(listensock , SOL_SOCKET , SO_REUSEADDR | SO_REUSEPORT , &opt , sizeof(opt));
    }

2.8 进行任务派发

因为我们之前已经将每个文件描述符对应的处理方法加入到了Connection对象中了,因此直接进行调用即可。

在进行任务派发的时候有一个细节:可以将异常处理嫁接到读写事件中的异常处理,这样就不需要再单独对异常进行处理了。

    void Dispatcher(int n)
    {
        for (int i = 0; i < n; i++)
        {
            int fd = _epl_array[i].data.fd;
            short events = _epl_array[i].events;
            auto &con_ptr = _connections[fd];

            // 将异常处理, 转化为读写处理
            if (events & EPOLLERR)
            {
                events |= (EPOLLIN | EPOLLOUT);
            }

            if (_connections.count(fd) && con_ptr->_Recv)
            {
                con_ptr->_Recv(con_ptr);
            }
            if (_connections.count(fd) && con_ptr->_Sender)
            {
                con_ptr->_Sender(con_ptr);
            }
        }
    }```

## 服务器的主循环

服务器的主循环就比较简单了,直接进行`epoll_wait`即可,将操作系统中的就绪队列拿到:

```cpp
    void Run()
    {
        while (1)
        {
            int n = _epoll_ptr->Wait(_epl_array, array_num_max, -1);
            if (n > 0)
            {
                Dispatcher(n);
            }
            else if (n == 0)
            {
                Log(Info) << "no message";
            }
            else
            {
                Log(Warning) << "epoll wait fail";
            }
        }
    }

以上就是整个服务器的实现过程了,下面我们对服务器接入一下事件,让服务器能够处理一些业务。

三. 补充

3.1 实现在线计算器

此处我们引入之前:手动私下序列化和换序列化的代码,来实现一个手动计算器:

std::string Onmessge(std::shared_ptr<Connection> con_ptr)
{
    static Calculator cal;
    std::string& inbuffer = con_ptr->Get_Inbuffer();  

    std::string ret = cal(inbuffer);  // 对请求进行处理 , 返回一个序列化后的字符串
    return ret;
}

3.2 引入线程池

对于引入线程池,此代码就需要进行重构了,在Connection对象中我们需要存储一个Server的回指指针,但是此处不能直接使用shared_ptr<>否则会出现循环引用,因此要采用weak_ptr来实现。

但是注意:我们是在类的成员函数中使用其this指针来构建一个sharead_ptr,从而初始化weak_ptr

如果在类的成员函数中,直接通过 this 指针创建新的 shared_ptr,会导致两个独立的 shared_ptr 管理同一个对象,但它们的引用计数是分开的:

  • 原有的 shared_ptr (创建服务器时候的)计数减到 0 时,会释放对象;
  • 新创建的 shared_ptr (this指针创建的)计数减到 0 时,会再次尝试释放已被销毁的对象,导致双重释放(double free) 或未定义行为。

此处我们需要使用enable_shared_from_this<T>继承来进行解决:

  • 当类 T 继承 enable_shared_from_this<T> 后,该类会隐式包含一个 weak_ptr<T> 成员(内部维护)。当 T 的对象被 shared_ptr 管理时,这个 weak_ptr 会与管理该对象的 shared_ptr 共享控制块(记录引用计数的结构)。

此时,通过调用 shared_from_this() 方法,可返回一个指向自身的 shared_ptr<T>,这个新的 shared_ptr 会复用原有的引用计数,避免双重释放。

服务器类定义:

class Rserver : public std::enable_shared_from_this<Loop_Epollserver>
{
public:
	// ......
};

Connection类中增加一个成员:weak_ptr<Rserver> _loop_svr.

对于创建Connection对象部分也要进行修改:

std::shared_ptr<Connection> conptr(new Connection(listensock,shared_from_this(),
                            std::bind(&Rserver::A***ept, this, std::placeholders::_1),
                                nullptr, nullptr));

在第二个实参中,传入this指针来构建Connection中的weak_ptr

关于线程池部分的代码因为文章篇幅就不再叙述了,大家可以自己试试写一下。

转载请说明出处内容投诉
CSS教程网 » 【IO多路转接】高并发服务器实战:Reactor 框架与 Epoll 机制的封装与设计逻辑

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买