事件驱动reactor的原理与实现

一、背景介绍

        在上一篇博客中,详细介绍了多路复用的select/poll/epoll算法,在学完epoll,我们已经能让一个线程同时盯住一百万个 fd,并在数据到达时立刻醒来;可翻开代码,我们依旧面临一堆“体力活”:

        1. epoll_wait 返回后,一大坨 if/else 判断谁就绪;

        2. 拼包、粘包、写不全的缓冲区要自己挪来挪去;

        3. 加新协议?主循环又得继续“垒烟囱”;

        4. 想从单线程改成“主从”或“线程池”?所有逻辑推倒重来。

        即epoll只解决了“通知”的问题,却没有回答“通知来了以后,代码怎么长得好看、好维护、好扩展。于是人们把“通知”这件事再抽象一层,把事件分发、缓冲区管理、生命周期、多线程模型统统封装起来——这就是 Reactor(反应器)模式。它并不替代 epoll,而是站在 epoll 肩上,本文将详细介绍“reactor”框架。


二、reactor原理

2.1 基本定义

        reactor是一种把I/O事件统一侦测、分派给用户回调执行,且线程绝不阻塞在 I/O 上的同步事件驱动设计模式。简单来说就是"事件到来 → 通知 → 回调" 的同步非阻塞网络模型。这里有四个关键词:

  1. 同步 - 读写操作仍在发起线程完成,不借助异步 I/O 内核完成通知

  2. 事件驱动 - 只关注"就绪/可写/错误"事件,事件到才回调

  3. 统一侦测 - 用 select/poll/epoll 等一次性收割所有 fd 事件

  4. 绝不阻塞 - 唯一可能阻塞的点是事件等待;读写本身非阻塞

        Reactor 不是库,也不是框架,而是**"等待-分发-回调"循环**的抽象范式;任何语言、任何系统调用,只要满足上述四条,就称为 Reactor 实现。


2.2 四大核心角色

2.2.1 Handle事件源

  1. 定义:任何能产生事件的“东西”,多数时候它就是那个小小的int fd,但背后藏着一整套“从网卡到进程”的完整事件生产链。

  2. 作用:

    • 给内核( Demultiplexer)提供“监视靶子

    • 给应用(Dispatcher) 提供“身份令牌”(通常就是 fd 值或指针)

  3. 生命周期:创建 → 注册到 Demultiplexer → 事件到达 → 分派 → 处理 → 关闭。


2.2.2 Synchronous Event Demultiplexer(同步事件分拣器)

        Synchronous Event Demultiplexer(同步事件分离器)是 Reactor 模型中真正“让线程睡觉、又负责叫醒”的那一层。它并不处理业务,只干三件事:注册兴趣、阻塞等待、返回就绪列表。简单来说,它的职责就是:“线程你把要盯的 fd 交给我,然后睡觉;有活干了我叫醒你,还把就绪清单放桌上。”在 Linux 上,epoll 就是 Synchronous Event Demultiplexer 的事实标准实现。实现过程如下:

  1. 创建实例

    int epfd = epoll_create1(0);

    实现初始化 就绪链表rdllist和等待队列wq。

  2. 注册兴趣

    struct epoll_event ev = { .events = EPOLLIN, .data.ptr = conn };
    epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);

    内核为每个 fd 生成一个 struct epitem(epoll 的内核私有结构体),并把 epitem.rdllink (就绪链表节点)挂到 rdllist(若已就绪),同时把 epitem.pwqlist (私有等待队列) 与 socket 等待队列wq勾连。

  3. 阻塞等待

    int n = epoll_wait(epfd, events, MAX, -1);

    检查rdllist是否为空,若为 ,设置 TASK_INTERRUPYOBBLE 并 调度出去(CPU 0%)若rdllist 非空 则立即拷贝就绪项到用户空间并返回

  4. 数据到达唤醒链(以 TCP 可读为例)

    网卡 → 软中断 → tcp_rcv() → sock_def_readable()
         → ep_poll_callback() → 把 epitem 插入 rdllist
         → __wake_up(&ep->wq) → 用户态线程恢复

    数据到达→socket 队列非空→ep_poll_callback 把 epitem 挂到就绪链表并唤醒线程,于是 epoll_wait 立即返回,用户回调被执行。


2.2.3 Dispatcher(分派器)——Reactor 的“事件调度总台”

        分派器就是把 Synchronous Event Demultiplexer 返回的就绪事件列表,按 Handle 为单位路由到对应 Event Handler 的函数指针,并保证 O(1) 或 O(就绪数) 复杂度且不阻塞的纯路由层。简单来说就是把内核返回的就绪事件‘快递包裹’,按门牌号(fd)瞬间投送到正确的收件人(回调函数),保证整个 Reactor 循环既不迷路、也不堵车。示例如下:

for (i = 0; i < nready; i++) {
    int fd = events[i].data.fd;
    if (events[i].events & EPOLLIN)
        conn_list[fd].r_action.recv_callback(fd);
    if (events[i].events & EPOLLOUT)
        conn_list[fd].send_callback(fd);
}

2.2.4 Event Handler(事件处理器)——Reactor 里真正“干活的人”

        事件处理器就是将被 Dispatcher 调用的用户态函数/对象,只在 I/O 就绪时运行,必须非阻塞地完成业务逻辑、状态迁移和输出产生。其核心职责分为以下几个部分:

  1. 拼包 - 解决粘包/半包,把字节流拼成完整消息

  2. 算账 - 解码、校验、业务逻辑、状态机推进

  3. 写回 - 把响应写入缓冲区,注册可写事件

  4. 保状态 - 写不完时保留偏移,下次继续;事件兴趣按需翻转


Reactor 四大组件按事件驱动流水线耦合,协作关系如下:

  1. Handle
    Synchronous Event Demultiplexer 注册就绪兴趣;事件到达时内核将其插入就绪队列,成为后续分派的唯一身份令牌。

  2. Synchronous Event Demultiplexer
    阻塞等待内核就绪通知;返回时保证 rdllist 仅包含当前周期内状态变化的 Handle,复杂度 O(1) 于就绪数,零拷贝导出至用户空间。

  3. Dispatcher
    遍历就绪列表,按 Handle 索引调用对应 Event Handler 函数指针;自身无业务逻辑、无阻塞、无锁,路由复杂度 O(就绪数)。

  4. Event Handler
    执行非阻塞 I/O 与业务计算;通过修改缓冲区及事件掩码产生副作用,触发下一次兴趣注册,形成闭环状态机。


三、代码示例

本次示例的完整代码如下所示:

#include <errno.h>
#include <stdio.h>
#include <sys/socket.h>
#include <***i***/in.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <poll.h>
#include <sys/epoll.h>
#include <errno.h>
#include <sys/time.h>


#include "server.h"


#define CONNECTION_SIZE			1048576 // 1024 * 1024

#define MAX_PORTS			20

#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)





int a***ept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);



int epfd = 0;
struct timeval begin;


//Handle事件源
struct conn conn_list[CONNECTION_SIZE] = {0};
// fd


int set_event(int fd, int event, int flag) {

	if (flag) {  // non-zero add

		struct epoll_event ev;
		ev.events = event;
		ev.data.fd = fd;
		epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);

	} else {  // zero mod

		struct epoll_event ev;
		ev.events = event;
		ev.data.fd = fd;
		epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
		
	}
	

}


int event_register(int fd, int event) {

	if (fd < 0) return -1;

	conn_list[fd].fd = fd;
	conn_list[fd].r_action.recv_callback = recv_cb;
	conn_list[fd].send_callback = send_cb;

	memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);
	conn_list[fd].rlength = 0;

	memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH);
	conn_list[fd].wlength = 0;

	set_event(fd, event, 1);
}


// listenfd(sockfd) --> EPOLLIN --> a***ept_cb
int a***ept_cb(int fd) {

	struct sockaddr_in  clientaddr;
	socklen_t len = sizeof(clientaddr);

	int clientfd = a***ept(fd, (struct sockaddr*)&clientaddr, &len);
	//printf("a***ept finshed: %d\n", clientfd);
	if (clientfd < 0) {
		printf("a***ept errno: %d --> %s\n", errno, strerror(errno));
		return -1;
	}
	
	event_register(clientfd, EPOLLIN);  // | EPOLLET

	if ((clientfd % 1000) == 0) {

		struct timeval current;
		gettimeofday(&current, NULL);

		int time_used = TIME_SUB_MS(current, begin);
		memcpy(&begin, &current, sizeof(struct timeval));
		

		printf("a***ept finshed: %d, time_used: %d\n", clientfd, time_used);

	}

	return 0;
}


int recv_cb(int fd) {

	memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH );
	int count = recv(fd, conn_list[fd].rbuffer, BUFFER_LENGTH, 0);
	if (count == 0) { // disconnect
		printf("client disconnect: %d\n", fd);
		close(fd);

		epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); // unfinished

		return 0;
	} else if (count < 0) { // 

		printf("count: %d, errno: %d, %s\n", count, errno, strerror(errno));
		close(fd);
		epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);

		return 0;
	}

	
	conn_list[fd].rlength = count;
	//printf("RECV: %s\n", conn_list[fd].rbuffer);

#if 0 // echo

	conn_list[fd].wlength = conn_list[fd].rlength;
	memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, conn_list[fd].wlength);

	printf("[%d]RECV: %s\n", conn_list[fd].rlength, conn_list[fd].rbuffer);

#elif 0

	http_request(&conn_list[fd]);

#else

	ws_request(&conn_list[fd]);
	
#endif


	set_event(fd, EPOLLOUT, 0);

	return count;
}


int send_cb(int fd) {

#if 0

	http_response(&conn_list[fd]);

#else

	ws_response(&conn_list[fd]);

#endif

	int count = 0;

#if 0
	if (conn_list[fd].status == 1) {
		//printf("SEND: %s\n", conn_list[fd].wbuffer);
		count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);
		set_event(fd, EPOLLOUT, 0);
	} else if (conn_list[fd].status == 2) {
		set_event(fd, EPOLLOUT, 0);
	} else if (conn_list[fd].status == 0) {

		if (conn_list[fd].wlength != 0) {
			count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);
		}
		
		set_event(fd, EPOLLIN, 0);
	}
#else

	if (conn_list[fd].wlength != 0) {
		count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);
	}
	
	set_event(fd, EPOLLIN, 0);

#endif
	//set_event(fd, EPOLLOUT, 0);

	return count;
}



int init_server(unsigned short port) {

	int sockfd = socket(AF_I***, SOCK_STREAM, 0);

	struct sockaddr_in servaddr;
	servaddr.sin_family = AF_I***;
	servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0
	servaddr.sin_port = htons(port); // 0-1023, 

	if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) {
		printf("bind failed: %s\n", strerror(errno));
	}

	listen(sockfd, 10);
	//printf("listen finshed: %d\n", sockfd); // 3 

	return sockfd;

}

int main() {

	unsigned short port = 2000;


	epfd = epoll_create(1);

	int i = 0;

	for (i = 0;i < MAX_PORTS;i ++) {
		
		int sockfd = init_server(port + i);
		
		conn_list[sockfd].fd = sockfd;
		conn_list[sockfd].r_action.recv_callback = a***ept_cb;
		
		set_event(sockfd, EPOLLIN, 1);
	}

	gettimeofday(&begin, NULL);

	while (1) { // mainloop

		struct epoll_event events[1024] = {0};
		int nready = epoll_wait(epfd, events, 1024, -1);

		int i = 0;
		for (i = 0;i < nready;i ++) {

			int connfd = events[i].data.fd;

#if 0
			if (events[i].events & EPOLLIN) {
				conn_list[connfd].r_action.recv_callback(connfd);
			} else if (events[i].events & EPOLLOUT) {
				conn_list[connfd].send_callback(connfd);
			}

#else 
			if (events[i].events & EPOLLIN) {
				conn_list[connfd].r_action.recv_callback(connfd);
			} 

			if (events[i].events & EPOLLOUT) {
				conn_list[connfd].send_callback(connfd);
			}
#endif
		}

	}
	

}


在这段代码中体现了reactor的四大角色在代码中分别体现在:

角色 起始行号 关键符号 / 调用 功能描述
Handle ① 22 conn_list[fd] 下标即 fd 事件源 = fd,O(1) 寻址
Synchronous Event Demultiplexer ② 39 epoll_create(1) 创建内核事件表
② 40-49 set_event() epoll_ctl(ADD/MOD) 注册兴趣
251 epoll_wait(..., -1) 阻塞等待就绪清单
Dispatcher ③ 257-267 for (i = 0; i < nready; i++) 按就绪掩码分派到回调指针
Event Handler ④ 71 a***ept_cb() 新连接事件处理器
④ 98 recv_cb() 读就绪事件处理器
④ 140 send_cb() 写就绪事件处理器

更多知识详情可以查看此链接:https://github.***/0voice

转载请说明出处内容投诉
CSS教程网 » 事件驱动reactor的原理与实现

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买