%%
# 纲要
> 主干纲要、Hint/线索/路标
# Q&A
#### 已明确
#### 待明确
> 当下仍存有的疑惑
**❓<font color="#c0504d"> 有什么问题?</font>**
%%
# 基于事件的并发 ⭐
「**基于事件的并发**」是一种高效的**并发编程模型**,通过 **==非阻塞 I/O==** 和 **==事件驱动机制==** 来处理大量并发任务,而不依赖多线程或多进程,**避免多线程并发中的同步问题**,特别适用于**处理 ==I/O 密集型任务==**(如网络服务器处理网络请求,文件 I/O 等)。
常见应用场景:
- **高并发服务器**:如 `Nginx`,采用基于事件的并发模型,**可处理大量并发连接**,而不需要为每个连接创建独立的线程。
- **网络编程**:例如 `Node.js`,基于事件驱动的编程模型,适用于处理大量的**异步 I/O 请求**。
- **图形用户界面应用**:GUI 框架中常使用事件循环来处理用户的点击、键盘输入等事件,如 Qt、Tkinker
- **游戏开发**:游戏引擎通过事件循环来处理用户输入、动画帧更新等。
### 基于事件的并发的特点
- **==基于事件循环==**:程序**只运行单个线程**,其上**运行事件循环**,**持续监听事件并处理**。
- **==非阻塞 I/O==**(异步 I/O):基于事件的并发机制 **==不允许阻塞 I/O==**,要求必须使用 "**异步 I/O**" 进行事件处理。
- **回调机制**:当事件发生时,**调用与事件关联的回调函数**进行处理。
优点:
- **高效的 I/O 处理**:非阻塞 I/O 可以更高效地处理大量的并发请求,特别是 I/O 密集型任务(如网络服务器)。
- **低开销**:由于基于事件的并发不需要为每个任务创建线程或进程,所以内存和 CPU 资源开销较小。
- **避免复杂的线程同步**:事件驱动的模型避免了多线程编程中常见的竞争条件、锁和死锁问题。
缺点:
- **代码复杂度**:大量使用回调函数和状态机可能导致代码难以理解和维护,特别是在存在多个嵌套回调时(即“回调地狱”)。
- **不适合 CPU 密集型任务**:基于事件的模型非常适合 I/O 密集型任务,但对于 CPU 密集型任务,由于它在一个线程中运行,可能会导致性能瓶颈。
### 基于事件的并发的工作流程
**==事件循环==**(event loop)是核心,伪代码如下:
```c
while (1) { // 事件循环
events = getEvents(); // 接收事件
for (e in events)
processEvent(e); // 调用事件处理程序, 响应事件.
}
```
### 工作流程
1. **事件注册**:程序将预期 "**事件**" 及对应的 "**事件处理函数**" 注册到事件循环总
2. **事件监听**:"事件循环" 不断地等待事件发生(通常是 I/O 完成,用户输入等)
3. **事件分发与处理**:当事件发生时,**事件循环会调用注册的回调函数**来处理该事件。
4. **继续等待**:处理完当前事件后,**程序不会阻塞**,而是继续监听其他事件。
<br><br><br>
# 高性能并发模式
两种常见的**高性能网络并发设计模式**(即关于 "**事件处理**" 的设计模式):
- 「Reactor 模式」:**==非阻塞 I/O==** + **==I/O 多路复用==** (Non-blocking & I/O multiplexing)
- 「Proactor 模式」:**==异步 I/O==** + I/O 多路复用
## Reactor 模式(反映堆模式)
本质:**I/O 多路复用 + 非阻塞 I/O**,采用 "**面向对象**" 的方式进行封装。
Reactor 模式中的主要组件:
- **==Reactor==**(**事件多路分发器**): 负责**监听 I/O 事件**、使用 I/O 多路复用 API (如 `epoll`)来**等待事件发生**,将发生事件**分发给相应的处理器处理**。
- **==Handler== (事件处理器)**:负责**处理事件**,根据具体事件进行读、写、**业务处理**等。
- **Acceptor**(**资源管理器**):负责**建立连接**
其中,根据不同的业务场景,"**Reactor 的数量**" 以及 "**事件处理器的单进程/线程 or 多进程/线程**" 可灵活变化:
- 单 Reactor & 单进程 / 线程
- **单 Reactor & 多进程 / 线程**
- ~~多 Reactor & 单进程 / 线程~~(无意义,实际中没有应用)
- **多 Reactor & 多进程 / 线程**
##### Reactor 模式的应用场景
- **高并发服务器**:Redis、Nginx、Netty 等都**采用了 Reactor 模式** 来处理**大量并发连接**。
- **异步 I/O 应用**:聊天室、游戏服务器等,采用 Reactor 模式来避免 I/O 阻塞,**提高系统响应性**。
- **事件驱动框架**:Java NIO、Node.js、libevent 等事件驱动框架均**基于 Reactor 模式**。
### 单 Reactor & 单进程 / 线程
![[_attachment/02-开发笔记/99-Unix 环境编程/高级 IO.assets/IMG-高级 IO-3487B9FD3103243C19845ED4E44B5E95.png|637]]
### 单 Reactor & 多线程
主线程只负责监听文件描述符上是否有事件发生,而具体的事件处理由 "工作线程" 完成。
![[_attachment/02-开发笔记/99-Unix 环境编程/高级 IO.assets/IMG-高级 IO-1F0A9D6381CA1753F2F6EB4316A4608B.png|530]]
![[_attachment/02-开发笔记/99-Unix 环境编程/高级 IO.assets/IMG-高级 IO-01412B33DD8C861800C518854B3FF1E9.png|603]]
### 多 Reactor & 多线程
![[_attachment/02-开发笔记/99-Unix 环境编程/高级 IO.assets/IMG-高级 IO-8EDF99FA43E9633E37A608A07819AF2F.png|580]]
- **主线程 MainReactor**:只监听 "**连接建立**" 事件,收到事件后通过 Acceptor 对象调用 `accept` 创建连接,**将该连接分配给某个 SubReactor 子线程**。
- **子线程 SubReactor**:只监听 "**I/O**" 事件,事件发生时调用相应的 Handler 进行处理。
> [!info] 开源软件 Netty 与 Memcache 都采用了 "**==多 Reactor 多线程==**" 的方案,而 Nginx 采用了 "**==多 Reactor 多进程==**" 的方案。
<br>
## Proactor 模式
Proactor:基于**异步 I/O** 的网络模式,感知的是 "**已完成**" 的读写事件。
Reactor 可以理解为「**来了事件操作系统通知应用程序,让应用程序来处理**」,而 Proactor 可以理解为 「**来了事件操作系统来处理,处理完再通知应用进程**」。
![[_attachment/02-开发笔记/99-Unix 环境编程/高级 IO.assets/IMG-高级 IO-2B66B01E23DADCE47BF8111FDCB04900.png|751]]
<br><br><br>
# eventfd 事件通知
> eventfd 为 Linux 特有,自内核 2.6.22 版本起提供,位于头文件 `<sys/eventfd.h>` 中
eventfd 是 Linux 特有的基于 **文件描述符** 的轻量级 **事件通知机制** [^1]。
常见用途:
- **事件通知**:可用于线程或进程间的事件通知,结合 epoll 等 I/O 多路复用机制,实现多线程下的**事件唤醒机制**。
- **实现信号量**:可用于实现**用户态的信号量**;
#### 具体用法
1. `eventfd()` 系统调用会创建一个由内核维护的 **8 字节无符号整型数** (`uint64_t`),作为 **==计数值==**,返回指向其的**文件描述符**。
2. 对 eventfd 的**事件监听**:
- 当**计数值非 0** 时,使用 `select/poll/epoll` 监听,将触发 "**==可读事件==**"。
3. 对 eventfd 的**读写**:
- `write()` 向 eventfd 写入整数时,会**累加**到**计数值**上;
- `read()` 向 eventfd 读取时:
- 若计数值非 0,则**返回计数值,并==重置为 0==**(若设置 `EFD_SEMAPHORE`,则**计数值只减 1 & read 返回 1**)
- 若计数值已为 0,则**阻塞**。
#### API
```cpp
# 创建成功时, 返回一个文件描述符; -1 表示失败
int eventfd(unsigned int initval, int flags);
```
- `initval`:初始计数值
- `flags`:
- `EFD_NONBLOCK`:非阻塞模式
- `EFD_CLOEXEC`:`exce` 后进程不继承该文件描述符
- `EFD_SEMAPHORE`:使 `read` 值**只返回 1**,即 **==计数值只减少 1==**,可用于实现信号量(默认**返回计数值 & 重置为 0**)
#### 使用示例
示例一:
```cpp
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <unistd.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
int main() {
int efd = eventfd(0, EFD_NONBLOCK);
int epfd = epoll_create1(0);
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = efd;
epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &ev);
uint64_t val = 1;
write(efd, &val, sizeof(val)); // 触发事件
struct epoll_event events[1];
int nfds = epoll_wait(epfd, events, 1, -1);
if (nfds > 0 && (events[0].events & EPOLLIN)) {
read(efd, &val, sizeof(val));
printf("Event triggered, value: %lu\n", val);
}
close(efd);
close(epfd);
return 0;
}
```
示例二:
```cpp
#include <chrono>
#include <queue>
#include <string>
#include <mutex>
#include <atomic>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <thread>
#include <unistd.h>
using namespace std;
/* 基于eventfd + epoll实现多线程间通信
* 本质: eventfd 用作信号量
* 主线程: 向消息队列中加入消息, 写入eventfd计数;
* 工作线程: epoll监听eventfd, 收到通知后从消息队列中取出.
*/
queue<string> msg_queue;
mutex queue_mtx;
atomic<bool> running(true);
// 工作线程
void workerThread(int epollfd, int eventfd, int thread_id) {
struct epoll_event events[1];
while (running) {
int nfds = epoll_wait(epollfd, events, 1, -1);
if (nfds == -1) {
perror("epoll_wait");
break;
}
if (events[0].data.fd == eventfd) {
uint64_t value;
read(eventfd, &value, sizeof(value)); // 读取eventfd;
lock_guard<mutex> lock(queue_mtx);
if (!msg_queue.empty()) {
string msg = msg_queue.front();
msg_queue.pop();
printf("[Thread %d]: %s\n", thread_id, msg.c_str());
}
}
this_thread::sleep_for(chrono::seconds(3));
}
printf("线程%d退出\n", thread_id);
}
// 添加消息
void addMessage(int efd, const string& msg) {
{
lock_guard<mutex> lock(queue_mtx);
msg_queue.push(msg);
}
uint64_t value = 1;
write(efd, &value, sizeof(value));
}
int main() {
// 创建eventfd;
int efd = eventfd(0, EFD_NONBLOCK);
if (efd == -1) {
perror("eventfd");
return 1;
}
// 创建epoll
int epfd = epoll_create1(0); // 创建 epoll
if (epfd == -1) {
perror("epoll_create1");
close(efd);
return 1;
}
// 为eventfd注册监听
struct epoll_event ev;
ev.data.fd = efd;
ev.events = EPOLLIN;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &ev) < 0) {
perror("epoll_ctl");
close(efd);
close(epfd);
}
// 启动工作线程
int num_threads = 4;
vector<thread> workers;
for(int i = 0; i < num_threads; ++i) {
workers.emplace_back(workerThread, epfd, efd, i);
}
// 主线程添加消息
for (int i = 0; i < 10; ++i) {
addMessage(efd, "消息_" + to_string(i));
printf("已发送: %d\n", i);
uint64_t value = 1;
write(efd, &value, sizeof(value));
this_thread::sleep_for(chrono::seconds(1));
}
while (true) {
{
std::lock_guard<std::mutex> lock(queue_mtx);
if (msg_queue.empty()) break; // 任务队列为空,退出循环
}
std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 防止忙等
}
// 需要再通过eventfd通知线程退出, 否则线程将一直阻塞在epoll_wait.
running = false;
for (int i = 0; i < num_threads; ++i) {
uint64_t value = 1;
write(efd, &value, sizeof(value));
}
// 等待线程结束
for (auto& worker: workers) {
worker.join();
}
close(epfd);
close(efd);
printf("主线程退出\n");
return 0;
}
```
<br><br>
# Buffer
## 闪念
> sudden idea
## 候选资料
> Read it later
# ♾️参考资料
- [9.2 I/O 多路复用:select/poll/epoll | 小林coding](https://xiaolincoding.com/os/8_network_system/selete_poll_epoll.html#i-o-%E5%A4%9A%E8%B7%AF%E5%A4%8D%E7%94%A8)
# Footnotes
[^1]: 《Linux-UNIX 系统编程手册》P722