%% # 纲要 > 主干纲要、Hint/线索/路标 # Q&A #### 已明确 #### 待明确 > 当下仍存有的疑惑 **❓<font color="#c0504d"> 有什么问题?</font>** %% # 基于事件的并发 ⭐ 「**基于事件的并发**」是一种高效的**并发编程模型**,通过 **==非阻塞 I/O==** 和 **==事件驱动机制==** 来处理大量并发任务,而不依赖多线程或多进程,**避免多线程并发中的同步问题**,特别适用于**处理 ==I/O 密集型任务==**(如网络服务器处理网络请求,文件 I/O 等)。 常见应用场景: - **高并发服务器**:如 `Nginx`,采用基于事件的并发模型,**可处理大量并发连接**,而不需要为每个连接创建独立的线程。 - **网络编程**:例如 `Node.js`,基于事件驱动的编程模型,适用于处理大量的**异步 I/O 请求**。 - **图形用户界面应用**:GUI 框架中常使用事件循环来处理用户的点击、键盘输入等事件,如 Qt、Tkinker - **游戏开发**:游戏引擎通过事件循环来处理用户输入、动画帧更新等。 ### 基于事件的并发的特点 - **==基于事件循环==**:程序**只运行单个线程**,其上**运行事件循环**,**持续监听事件并处理**。 - **==非阻塞 I/O==**(异步 I/O):基于事件的并发机制 **==不允许阻塞 I/O==**,要求必须使用 "**异步 I/O**" 进行事件处理。 - **回调机制**:当事件发生时,**调用与事件关联的回调函数**进行处理。 优点: - **高效的 I/O 处理**:非阻塞 I/O 可以更高效地处理大量的并发请求,特别是 I/O 密集型任务(如网络服务器)。 - **低开销**:由于基于事件的并发不需要为每个任务创建线程或进程,所以内存和 CPU 资源开销较小。 - **避免复杂的线程同步**:事件驱动的模型避免了多线程编程中常见的竞争条件、锁和死锁问题。 缺点: - **代码复杂度**:大量使用回调函数和状态机可能导致代码难以理解和维护,特别是在存在多个嵌套回调时(即“回调地狱”)。 - **不适合 CPU 密集型任务**:基于事件的模型非常适合 I/O 密集型任务,但对于 CPU 密集型任务,由于它在一个线程中运行,可能会导致性能瓶颈。 ### 基于事件的并发的工作流程 **==事件循环==**(event loop)是核心,伪代码如下: ```c while (1) { // 事件循环 events = getEvents(); // 接收事件 for (e in events) processEvent(e); // 调用事件处理程序, 响应事件. } ``` ### 工作流程 1. **事件注册**:程序将预期 "**事件**" 及对应的 "**事件处理函数**" 注册到事件循环总 2. **事件监听**:"事件循环" 不断地等待事件发生(通常是 I/O 完成,用户输入等) 3. **事件分发与处理**:当事件发生时,**事件循环会调用注册的回调函数**来处理该事件。 4. **继续等待**:处理完当前事件后,**程序不会阻塞**,而是继续监听其他事件。 <br><br><br> # 高性能并发模式 两种常见的**高性能网络并发设计模式**(即关于 "**事件处理**" 的设计模式): - 「Reactor 模式」:**==非阻塞 I/O==** + **==I/O 多路复用==** (Non-blocking & I/O multiplexing) - 「Proactor 模式」:**==异步 I/O==** + I/O 多路复用 ## Reactor 模式(反映堆模式) 本质:**I/O 多路复用 + 非阻塞 I/O**,采用 "**面向对象**" 的方式进行封装。 Reactor 模式中的主要组件: - **==Reactor==**(**事件多路分发器**): 负责**监听 I/O 事件**、使用 I/O 多路复用 API (如 `epoll`)来**等待事件发生**,将发生事件**分发给相应的处理器处理**。 - **==Handler== (事件处理器)**:负责**处理事件**,根据具体事件进行读、写、**业务处理**等。 - **Acceptor**(**资源管理器**):负责**建立连接** 其中,根据不同的业务场景,"**Reactor 的数量**" 以及 "**事件处理器的单进程/线程 or 多进程/线程**" 可灵活变化: - 单 Reactor & 单进程 / 线程 - **单 Reactor & 多进程 / 线程** - ~~多 Reactor & 单进程 / 线程~~(无意义,实际中没有应用) - **多 Reactor & 多进程 / 线程** ##### Reactor 模式的应用场景 - **高并发服务器**:Redis、Nginx、Netty 等都**采用了 Reactor 模式** 来处理**大量并发连接**。 - **异步 I/O 应用**:聊天室、游戏服务器等,采用 Reactor 模式来避免 I/O 阻塞,**提高系统响应性**。 - **事件驱动框架**:Java NIO、Node.js、libevent 等事件驱动框架均**基于 Reactor 模式**。 ### 单 Reactor & 单进程 / 线程 ![[_attachment/02-开发笔记/99-Unix 环境编程/高级 IO.assets/IMG-高级 IO-3487B9FD3103243C19845ED4E44B5E95.png|637]] ### 单 Reactor & 多线程 主线程只负责监听文件描述符上是否有事件发生,而具体的事件处理由 "工作线程" 完成。 ![[_attachment/02-开发笔记/99-Unix 环境编程/高级 IO.assets/IMG-高级 IO-1F0A9D6381CA1753F2F6EB4316A4608B.png|530]] ![[_attachment/02-开发笔记/99-Unix 环境编程/高级 IO.assets/IMG-高级 IO-01412B33DD8C861800C518854B3FF1E9.png|603]] ### 多 Reactor & 多线程 ![[_attachment/02-开发笔记/99-Unix 环境编程/高级 IO.assets/IMG-高级 IO-8EDF99FA43E9633E37A608A07819AF2F.png|580]] - **主线程 MainReactor**:只监听 "**连接建立**" 事件,收到事件后通过 Acceptor 对象调用 `accept` 创建连接,**将该连接分配给某个 SubReactor 子线程**。 - **子线程 SubReactor**:只监听 "**I/O**" 事件,事件发生时调用相应的 Handler 进行处理。 > [!info] 开源软件 Netty 与 Memcache 都采用了 "**==多 Reactor 多线程==**" 的方案,而 Nginx 采用了 "**==多 Reactor 多进程==**" 的方案。 <br> ## Proactor 模式 Proactor:基于**异步 I/O** 的网络模式,感知的是 "**已完成**" 的读写事件。 Reactor 可以理解为「**来了事件操作系统通知应用程序,让应用程序来处理**」,而 Proactor 可以理解为 「**来了事件操作系统来处理,处理完再通知应用进程**」。 ![[_attachment/02-开发笔记/99-Unix 环境编程/高级 IO.assets/IMG-高级 IO-2B66B01E23DADCE47BF8111FDCB04900.png|751]] <br><br><br> # eventfd 事件通知 > eventfd 为 Linux 特有,自内核 2.6.22 版本起提供,位于头文件 `<sys/eventfd.h>` 中 eventfd 是 Linux 特有的基于 **文件描述符** 的轻量级 **事件通知机制** [^1]。 常见用途: - **事件通知**:可用于线程或进程间的事件通知,结合 epoll 等 I/O 多路复用机制,实现多线程下的**事件唤醒机制**。 - **实现信号量**:可用于实现**用户态的信号量**; #### 具体用法 1. `eventfd()` 系统调用会创建一个由内核维护的 **8 字节无符号整型数** (`uint64_t`),作为 **==计数值==**,返回指向其的**文件描述符**。 2. 对 eventfd 的**事件监听**: - 当**计数值非 0** 时,使用 `select/poll/epoll` 监听,将触发 "**==可读事件==**"。 3. 对 eventfd 的**读写**: - `write()` 向 eventfd 写入整数时,会**累加**到**计数值**上; - `read()` 向 eventfd 读取时: - 若计数值非 0,则**返回计数值,并==重置为 0==**(若设置 `EFD_SEMAPHORE`,则**计数值只减 1 & read 返回 1**) - 若计数值已为 0,则**阻塞**。 #### API ```cpp # 创建成功时, 返回一个文件描述符; -1 表示失败 int eventfd(unsigned int initval, int flags); ``` - `initval`:初始计数值 - `flags`: - `EFD_NONBLOCK`:非阻塞模式 - `EFD_CLOEXEC`:`exce` 后进程不继承该文件描述符 - `EFD_SEMAPHORE`:使 `read` 值**只返回 1**,即 **==计数值只减少 1==**,可用于实现信号量(默认**返回计数值 & 重置为 0**) #### 使用示例 示例一: ```cpp #include <sys/epoll.h> #include <sys/eventfd.h> #include <unistd.h> #include <stdint.h> #include <stdio.h> #include <stdlib.h> int main() { int efd = eventfd(0, EFD_NONBLOCK); int epfd = epoll_create1(0); struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = efd; epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &ev); uint64_t val = 1; write(efd, &val, sizeof(val)); // 触发事件 struct epoll_event events[1]; int nfds = epoll_wait(epfd, events, 1, -1); if (nfds > 0 && (events[0].events & EPOLLIN)) { read(efd, &val, sizeof(val)); printf("Event triggered, value: %lu\n", val); } close(efd); close(epfd); return 0; } ``` 示例二: ```cpp #include <chrono> #include <queue> #include <string> #include <mutex> #include <atomic> #include <sys/epoll.h> #include <sys/eventfd.h> #include <thread> #include <unistd.h> using namespace std; /* 基于eventfd + epoll实现多线程间通信 * 本质: eventfd 用作信号量 * 主线程: 向消息队列中加入消息, 写入eventfd计数; * 工作线程: epoll监听eventfd, 收到通知后从消息队列中取出. */ queue<string> msg_queue; mutex queue_mtx; atomic<bool> running(true); // 工作线程 void workerThread(int epollfd, int eventfd, int thread_id) { struct epoll_event events[1]; while (running) { int nfds = epoll_wait(epollfd, events, 1, -1); if (nfds == -1) { perror("epoll_wait"); break; } if (events[0].data.fd == eventfd) { uint64_t value; read(eventfd, &value, sizeof(value)); // 读取eventfd; lock_guard<mutex> lock(queue_mtx); if (!msg_queue.empty()) { string msg = msg_queue.front(); msg_queue.pop(); printf("[Thread %d]: %s\n", thread_id, msg.c_str()); } } this_thread::sleep_for(chrono::seconds(3)); } printf("线程%d退出\n", thread_id); } // 添加消息 void addMessage(int efd, const string& msg) { { lock_guard<mutex> lock(queue_mtx); msg_queue.push(msg); } uint64_t value = 1; write(efd, &value, sizeof(value)); } int main() { // 创建eventfd; int efd = eventfd(0, EFD_NONBLOCK); if (efd == -1) { perror("eventfd"); return 1; } // 创建epoll int epfd = epoll_create1(0); // 创建 epoll if (epfd == -1) { perror("epoll_create1"); close(efd); return 1; } // 为eventfd注册监听 struct epoll_event ev; ev.data.fd = efd; ev.events = EPOLLIN; if (epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &ev) < 0) { perror("epoll_ctl"); close(efd); close(epfd); } // 启动工作线程 int num_threads = 4; vector<thread> workers; for(int i = 0; i < num_threads; ++i) { workers.emplace_back(workerThread, epfd, efd, i); } // 主线程添加消息 for (int i = 0; i < 10; ++i) { addMessage(efd, "消息_" + to_string(i)); printf("已发送: %d\n", i); uint64_t value = 1; write(efd, &value, sizeof(value)); this_thread::sleep_for(chrono::seconds(1)); } while (true) { { std::lock_guard<std::mutex> lock(queue_mtx); if (msg_queue.empty()) break; // 任务队列为空,退出循环 } std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 防止忙等 } // 需要再通过eventfd通知线程退出, 否则线程将一直阻塞在epoll_wait. running = false; for (int i = 0; i < num_threads; ++i) { uint64_t value = 1; write(efd, &value, sizeof(value)); } // 等待线程结束 for (auto& worker: workers) { worker.join(); } close(epfd); close(efd); printf("主线程退出\n"); return 0; } ``` <br><br> # Buffer ## 闪念 > sudden idea ## 候选资料 > Read it later # ♾️参考资料 - [9.2 I/O 多路复用:select/poll/epoll | 小林coding](https://xiaolincoding.com/os/8_network_system/selete_poll_epoll.html#i-o-%E5%A4%9A%E8%B7%AF%E5%A4%8D%E7%94%A8) # Footnotes [^1]: 《Linux-UNIX 系统编程手册》P722