%%
# 纲要
> 主干纲要、Hint/线索/路标
# Q&A
#### 已明确
#### 待明确
> 当下仍存有的疑惑
**❓<font color="#c0504d"> 有什么问题?</font>**
# Buffer
## 闪念
> sudden idea
## 候选资料
> Read it later
%%
# 同步原语 ⭐
同步原语(synchronization primitive)是用于**协调多个线程间执行顺序、访问共享资源的一组==机制==**,
确保**各线程在特定条件下按照正确的顺序执行**,旨在避免 "**竞态条件**"、"**数据不一致**"、"**死锁**" 等问题。
### 基本的几种同步原语
- **互斥原语**(Mutual Exclusion Primitives):**防止==多个线程或进程==在同一时刻==访问临界区或共享资源==**。
- **互斥锁**(Mutex):保证同一时刻只有一个线程能进入临界区。
- **自旋锁**(Spinlock):忙等待的锁,用于临界区较短的场景。
- **读写锁**(Reader-Writer Lock):允许多线程同时读,但写操作是互斥的。
- **条件变量**(Condition Variable):用于**线程间的==等待和通知==机制**。
- **信号量**(Semaphore):控制**资源的==并发访问数量==**。
- **屏障**(Barrier):确保**所有线程到达同步点**后再继续。
- **原子操作**(Atomic Operation):不可分割的操作,用于简单的同步需求。
### "互斥" 与 "同步"
- **互斥**:确保**同一时刻只有一个线程能==访问 "共享资源"==(进入临界区)**
- 例如:线程 A 与线程 B **不能同时修改一个全局变量**
- **同步**:通过 **"无竞争"** 的 "**等待/唤醒**" 机制,协调并发进程/线程间的**执行顺序**
- 例如:**线程 B 必须==等待==线程 A 执行完某个操作之后才能执行**
> [!NOTE] **互斥原语**是**同步原语的子集**
>
> 互斥的本质即让多线程 "**==串行化==**",**阻止对共享资源的并发访问**,令各线程**只能先后串行依次访问临界区或共享资源**。
<br><br>
# 锁
在多线程并发编程中,**锁**(Lock)是用于**防止多个线程在同一时间访问共享资源的互斥机制**,避免**出现竞态条件和数据不一致**问题。
常见的多线程锁包括:
- **乐观锁**:基于乐观假设(并发环境下对共享资源的访问冲突少见),采用 "**冲突检测机制**"(版本号或 CAS 操作),**更新共享数据时不加锁**,仅在 "**提交时**" 进行冲突检测,**若冲突则回滚操作**。
- **悲观锁**:基于悲观假设(并发环境中**对共享资源的访问冲突**频繁常见),因此**每次访问共享资源时都必须==直接加锁互斥==**。
- **互斥锁**(Mutual Exclusion Lock):请求锁失败而 **阻塞等待** 时,线程会 **==释放 CPU==**;
- **自旋锁**(Spinlock):加锁失败后,线程会 **==忙等待==**,直到它拿到锁;
- **读写锁**(Reader-Writer Lock):"读-读"可并发,"读-写" & "写-写" 互斥。
- **递归锁**(Recursive Mutex):允许同一线程**在递归调用中多次获取该锁**,**而每次加锁都需要有对应的解锁操作**。直到所有锁被释放,其他线程才能获取该锁。
> [!caution] 线程调度由操作系统决定
>
> **线程调度**由操作系统的调度器控制, 而调度器的行为可能因为多种因素而有所不同,包括系统当前负载、线程优先级等。
>
> **即使 `t1` 线程先被创建和启动,也不能保证其会先于 `t2` 线程获得互斥锁**。
> 即便两个线程几乎同时达到等待锁的时间点,也是**由操作系统来决定哪个线程先获取锁**。
> [!NOTE] 锁不能保证获取顺序!当一把锁释放后,**所有等待该锁的线程共同竞争**,最先申请锁的线程不一定能获取!
>
> 如果**多个线程均阻塞着等待某个锁**,则在这一锁被释放时,**任何==等待线程==都将有机会竞争锁**,最终谁能获得该锁取决于操作系统的调度。
<br>
## 互斥锁
「**互斥锁**」:最基本的锁,**确保同一时刻==只有一个线程能够访问共享资源==**。当一个线程获得互斥锁后,**其他线程必须等待,直到锁被释放**。
**使用场景**:需要确保**只有一个线程访问共享资源**时。
![[_attachment/02-开发笔记/05-操作系统/并发相关/同步原语.assets/IMG-同步原语-9A5EC13D2162F0522AE51EAD8FF66D4E.png|476]]
```cpp
std::mutex mtx;
void thread_function() {
mtx.lock(); // 获取锁
// 临界区代码
mtx.unlock(); // 释放锁
}
```
<br>
## 自旋锁
「**自旋锁**」:**当一个线程尝试获取锁时,如果锁已经被占用,==线程会保持活跃、不断地轮询检查锁的状态,直到获取锁为止==,而不会进入睡眠**。
> [!info] "自旋" 与 "忙等待" 的含义
>
> - **自旋**(spin)是指线程在等待获取锁的过程中,不进入休眠状态,**保持活跃**(占用 CPU),**持续地反复循环检查锁的状态,直至获取到锁为止**。
> - **忙等待**(busy waiting)—— 持续检查一个变量,直到某个值出现为止。
> [!caution] 在单核 CPU 上,需要抢占式调度器(**即不断通过时钟中断一个线程,运行其他线程**),否则一个自旋线程永远不会主动放弃 CPU
**适用场景**:适用于**临界区执行时间非常短的场景**——"**自旋" 比让线程睡眠后再唤醒开销更低的情况**。
```cpp
std::atomic_flag lock = ATOMIC_FLAG_INIT;
void thread_function() {
while (lock.test_and_set(std::memory_order_acquire)) {
// 自旋等待锁释放
}
// 临界区代码
lock.clear(std::memory_order_release); // 释放锁
}
```
<br>
## 读写锁
> 读写锁也称 "**共享互斥锁**"(shared-exclusive lock)
「**读写锁**」:**==允许多个线程同时读取==共享资源**,**但==只允许一个线程写入==共享资源**。
**适用场景**:适用于**读多写少**的场景,如缓存、配置文件读取等。
特点如下:
- **读可并发**,而 **写-写互斥**
- **读-写互斥**:读写互斥,不能同时进行。
- "**==读锁==**" 必须等 "**==写锁==**" 释放后才能获取
- "**==写锁==**" 必须等==所有 "**读锁**"== 释放后才能获取
> [!NOTE] C++中的 `std::shared_mutex` 即用作读写锁
```cpp
std::shared_mutex rw_lock;
void read() {
std::shared_lock<std::shared_mutex> lock(rw_lock); // 共享锁,多个线程可以读
// 读操作
}
void write() {
std::unique_lock<std::shared_mutex> lock(rw_lock); // 独占锁,写操作互斥
// 写操作
}
```
<br>
### "读优先锁" 与 "写优先锁"
根据实现的不同,有三两种类型:
- **==读优先锁==**:当已有线程持有读锁时,其余**线程可继续获得读锁**,**==写锁==必须等到==所有读锁释放后==才能获取**。
* 特点:**读优先**,但可能**饿死写线程**
* 实现思路: 当一个线程:
* 1) 申请读锁时, 若**没有其它线程持有读锁**, 则 **请求"读写互斥锁"**;
* 2) 释放读锁时, 若其**是最后一个"读锁**", 则**释放"读写互斥锁"**;
* 3) 请求写锁时 => **直接竞争** "读写互斥锁"
* 4) 释放写锁时 => **直接释放** "读写互斥锁"
- **==写优先锁==**:当已有线程持有读锁,而**写者线程==请求写锁被阻塞等待==时,将==阻塞后续所有线程获取读锁==**。
- 特点:**写优先**,但可能**饿死读线程**
- 实现思路:
- 1) 当一个线程**申请读锁**时, 若存在运行or等待中的写者, 则其阻塞 => 等待获取"读锁".
* 2) 当一个线程**释放"最后一个"读锁**时, 唤醒"等待写锁"的线程.
* 3) 当一个线程**申请写锁**时, 若存在运行中的读者或者其他写者, 则阻塞 => "等待获取"写锁"
* 4) 当一个线程**释放写锁**时, "**==优先"唤醒"等待写锁的线程==**", 其次唤醒所有 "等待读锁的线程".
- **==公平读写锁==**:用队列把**获取锁的线程排队**,不管是**写线程还是读线程都按照==先进先出的原则==加锁**,读线程仍然可以并发,也不会出现「饥饿」的现象。
![[_attachment/02-开发笔记/05-操作系统/并发相关/同步原语.assets/IMG-同步原语-419EB0FDEB255857B45B6A18F0282BD1.png|1016]]
<br>
## 递归锁
「**递归锁**」:允许同一线程**在递归调用中多次获取该锁**而不会发生死锁/阻塞,**每次加锁都需要有对应的解锁操作**。直到所有锁被释放,其他线程才能获取该锁。
适用场景:在**递归函数或多个方法调用**中,某个线程需要**多次进入同一个临界区**。
```cpp
std::recursive_mutex rmtx;
void recursive_function(int depth) {
rmtx.lock();
if (depth > 0) {
recursive_function(depth - 1);
}
rmtx.unlock();
}
```
<br><br><br>
# 条件变量
> ❓<font color="#c00000">什么是条件变量?</font>
条件变量:用于 "**线程之间==等待和通知==**" 的同步机制,**使线程在等待某个条件成立时==挂起进入休眠状态==**,而**在条件满足时被唤醒继续执行**,需要与 **==互斥锁==** 一起使用。
使用场景:**生产者-消费者**模型、**任务队列**。
> [!note] 条件变量机制下,线程在==等待条件成立时休眠,不占用 CPU==,直到被唤醒。
>
> 如果不使用条件变量,也可以采用 "**自旋**" 的方式,例如在 `while(cond == False)` 循环里持续检查直至 `cond` 为 `true`,但自旋会使得**该线程浪费 CPU 时间**。
<br>
## 条件变量机制
> ❓<font color="#c00000">条件变量的用法是什么?</font>
要素:
- **互斥锁**:用于**保护对 "条件" 的访问**
- **条件状态**:等待线程**所等待成立的特定条件**,例如某个共享变量的值是否为特定值,或某个谓词函数是否为真。
- **条件变量**:用于提供 "**等待/唤醒**" 这一线程间同步机制的 **API**,其标识了一个 "**等待队列**";
1. 在 "条件" 不满足时,**让线程阻塞等待**;
2. 在 "条件" 满足时,**唤醒等待中的线程**。
两过程:
- **等待**:当一个线程**检查发现条件不满足**时,**==进入休眠==状态等待**(如调用 `pthread_cond_wait(&cv, &mutex)`)。
- 阻塞等待期间,该线程会 **==释放与条件变量相关的互斥锁==**,从而允许其他线程获取锁并**修改共享数据**。
- 被唤醒后(从`pthread_cond_wait()` 返回),**该线程==重新获得锁==**。
- **通知**:当某个线程**改变了共享数据的状态**后,通过条件变量来 **==通知/唤醒==** 等待的线程
- 如调用`pthread_cond_signal(&cond)` 或 `pthread_cond_broadcast(&cond)` 进行唤醒
> [!quote]
> ![[_attachment/02-开发笔记/05-操作系统/并发相关/同步原语.assets/IMG-同步原语-1CB023178E624FD62F998041AFBFC439.png|442]]
<br>
## 注意事项 🚨
> [!caution] **使用条件变量进行 "等待&通知"(调用 `wait` 与 `signal` ) 时都必须==持有互斥锁==**
>
> - 对于 "**进行等待**" 的线程,要保证 "**检查条件**" 的操作是原子性的。
> - 对于 "**进行通知**" 的线程,要保证 "**改变共享数据**" 的操作是原子性的。
> [!caution] 阻塞等待的线程可能被 **==虚假唤醒==**!必须使用 `while()` 来**检查条件**,而不是 `if()`,
>
> 等待线程可能受 OS 调度影响而被 "**唤醒**",即被唤醒时条件并不一定满足,因此**必须外套 `while` 在其被唤醒后再次判断**。
>
> 例如在生产者-消费者模式中,若存在多个消费者,则**某个消费者被唤醒时**,可能缓冲区中数据已被其他消费者用掉!
>
> 示例:
>
> ```c
> while (!ready) { // 防止虚假唤醒——唤醒后再次进行条件判断
> pthread_cond_wait(&cond, &mutex);
> }
> // 条件满足后继续执行
> ```
>
> ^6hqcq5
<br>
## 使用示例
#### C++ API 示例
```cpp
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
void wait_thread() {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, []{ return ready; }); // 等待ready为true
// 临界区代码
}
void signal_thread() {
std::lock_guard<std::mutex> lock(mtx);
ready = true;
cv.notify_one(); // 唤醒等待的线程
}
```
#### POSIX API 示例
```c
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
int done = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
// 通知线程
void* thr_exit(void* arg) {
int rc;
rc = pthread_mutex_lock(&mutex); assert(rc == 0);
done = 1;
rc = pthread_cond_signal(&cond); assert(rc == 0);
rc = pthread_mutex_unlock(&mutex); assert(rc == 0);
printf("thr_exit() has been called.\n");
return NULL;
}
// 等待线程
void* thr_join(void* arg) {
int rc;
rc = pthread_mutex_lock(&mutex); assert(rc == 0); // 检查共享变量done, 需要加锁.
while (done == 0) { // 当done为0时, 线程进入休眠等待状态, 释放互斥锁, 等待被唤醒.
rc = pthread_cond_wait(&cond, &mutex); assert(rc == 0);
}
rc = pthread_mutex_unlock(&mutex); assert(rc == 0);
printf("thr_join() has been called.\n");
return NULL;
}
int main() {
pthread_t p1, p2;
int rc;
rc = pthread_create(&p1, NULL, thr_exit, NULL); assert(rc == 0);
rc = pthread_create(&p2, NULL, thr_join, NULL); assert(rc == 0);
pthread_join(p1, NULL);
pthread_join(p2, NULL);
pthread_mutex_destroy(&mutex);
return 0;
}
```
<br><br><br>
# 信号量
> ❓ <font color="#c00000">什么是信号量?</font>
**信号量**(Semaphore)是用于**控制访问共享资源的==并发线程或进程数量==** 的同步机制。
信号量可视作一个 "**==计数器==**":
- **值为正**时:表示 "**==可用的共享资源数量==**"';
- **值为负**时,表示 "**==阻塞等待的线程/进程个数==**"。
## 信号量的机制
信号量提供了两个基本的**原子操作**:**==P 操作==**、**==V 操作==**
- **P 操作**:**递减信号量** & **==阻塞等待==** (API示例:`sem_wait()`)
- 信号量计数**减 1**,表示**获取了一个资源**。
- 若信号量减 1 后为负,则**该线程/进程进入==休眠等待状态==**(进入**等待队列**),等待该信号量 `+1` 后被唤醒。
- **V 操作**:**增加信号量** & **==通知==** (API 示例:`sem_post()`)
- 信号量计数**加 1**,表示**释放了一个资源**。
- 若有线程/进程在**该信号量对应的等待队列**中,则 **==唤醒一个等待线程/进程==**,使其继续执行。
进程/线程**访问一个共享资源**前,需执行 **P 操作** 检查是否有可用的共享资源,若有则减少信号量值,若无则阻塞。
进程/线程**不再占有一个共享资源**时,需执行 **V 操作** 增加信号量值,并唤醒一个正在等待资源的线程。
> [!Info] 信号量的实现伪代码(OS 确保 P、V 操作为 "==**原子操作**==" )
> ![[_attachment/02-开发笔记/05-操作系统/并发相关/同步原语.assets/IMG-同步原语-4E3EA2D608E5F0FEA64DDF5E8867421C.png|370]]
>
<br>
## 信号量的用途
根据信号量 "**初始值**" 的不同,有两种使用场景:
- **二值信号量**(Binary Semaphore)
- 其计数器只能为 "**0 或 1**",可用作 **互斥锁**(初值为 1)、**条件变量**(初值为 0);
- **计数信号量**(Counting Semaphore)
- 其计数器为 "**任意非负整数**",用于 **==控制同时访问某一共享资源的线程/进程数量==**。
## 使用示例
#### 用作互斥锁
**信号量==初始化值为 1==**,调用 `sem_wait()` **将计数降为 0 相当于获取锁**。
> [!NOTE] 信号量用作 "互斥锁" [^1]
> ![[_attachment/02-开发笔记/05-操作系统/并发相关/同步原语.assets/IMG-同步原语-C4AE9A7B84F4A4887A61727CC74084F4.png|559]]
```c
sem_t m;
sem_init(&m, 0, X); // initialzie semaphore to X; what should X be?
sem_wait(&m);
// critical section here
sem_post(&m);
// 销毁
sem_destroy(&sem);
```
#### 用作条件变量
> [!NOTE] 信号量用作 "条件变量" [^1]
>
> ![[_attachment/02-开发笔记/05-操作系统/并发相关/同步原语.assets/IMG-同步原语-D18B4D509084B42C6F1B3AE0C1DE0CCB.png|476]]
>
**信号量==初始化值为 0==**,线程 A **调用 `sem_wait()` 时进入等待**,**等到其他线程 B 调用 `sem_post` 将值+1 时进行唤醒**。
```c
#include <semaphore.h>
#include <pthread.h>
#include <stdio.h>
#include <assert.h>
// 信号量用作条件变量
sem_t s;
void* thread_func(void* arg) {
printf("thread run\n");
sem_post(&s); // 信号量加1
return NULL;
}
int main() {
sem_init(&s, 0, 0); // 信号量s用作条件变量, 初始化为0.
printf("main:begin\n");
// 线程函数.
pthread_t p;
int rc = pthread_create(&p, NULL, thread_func, NULL); assert(rc == 0);
sem_wait(&s); // 信号量减1, 主线程等待被子线程唤醒. 这里效果等价于pthread_join().
printf("main:end\n");
sem_destroy(&sem);
}
```
<br><br><br>
# 原子操作
「**原子操作**」是指 "**==不可分割==的线程安全操作**",即对任意线程可见而言,**该操作要么完全完成,要么完全没做**。
原子操作一旦执行,直至结束,**期间不会被其他线程或进程打断**,故支持在多线程并发环境下 **==无锁地==对共享数据进行安全修改**。
> [!NOTE] "原子操作" 的优点
>
> 实际上,采用 "**互斥保护**" 就可以将多个操作合并,实现原子化。
> **"原子操作" 的优点在于其内部由 "==原子指令==" (硬件层面的 CPU 指令)直接实现**,无需锁,故**省略了申请/释放锁所带来的开销**。
> [!NOTE] 原子操作是实现 **无锁数据结构(Lock-Free Data Structures)** 的基础
> [!info] C11 标准中引入的头文件 `<stdatomic.h>`, C++11 标准引入的 `<atomic>` 头文件中均提供了**一套线程安全的原子操作 API**。
> 参见 [[02-开发笔记/01-cpp/多线程并发相关/cpp-多线程&同步#C++ 原子操作|cpp-多线程&同步: C++原子操作]]
> [!caution] 非原子操作可能导致的数据错杂
>
> 假设对 C/C++ 中的一个**大型结构体(包含多个数据成员)赋值**,**若该赋值操作是无锁的**,由于其**并非 "原子操作"**,因此可能在**线程 A 赋值进行到一半时**,**另一个线程读取结构体值**,就得到了 "**只有部分数据成员被赋予新值**" 的数据混乱结果。
<br><br>
## CAS (比较并交换)
**比较并交换**(Compare-And-Swap,**==CAS==**)是一种**原子操作**,用以**在变量等于期望值时,将其更新为指定的新值**。
> [!note] CAS 的功能: **比较变量的==当前值与预期值==,如果它们相等,则将数据==更新为新的值==**,否则什么都不做。
- 优点:**无锁开销,无线程上下文切换的开销,无阻塞**,可提高并发性能。
- 缺点:
- **==ABA问题==**:某些场景下,**变量可能先从A变为B,随后又变回A**,而 **CAS 操作无法区分这种情况**,可能会误认为数据没有变化。
- **高自旋成本**:如果CAS操作频繁失败,导致线程不断自旋重试,可能会消耗大量的CPU资源
### CAS 功能
> [!info] C++标准库中的 `std::atomic` 提供了 `compare_exchange_strong` 与 `compare_exchange_weak` 两种 CAS 原子操作函数
```cpp
// CompareAndSwap的伪代码
int CompareAndSwap(int* address, int& expected, int new_value) {
int actual = *address;
if (actual == expected) {
*address = new_value;
} else {
expected = actual;
}
return actual; // 无论指定内存地址的当前值是否等于预期值, 都返回其旧值.
}
```
### ABA 问题
「ABA 问题」:CAS 操作**只能检查变量值是否与期望值一致**,但 **==无法检测变量值在此期间是否发生过变化==**——**是否被修改后又改回了原值**。
> [!example] ABA 问题说明
>
> - 一个变量初始值为`A`,线程 1 读取 `A` 之后作为期望值,并准备**稍后执行 CAS 操作**。
> - 另一个线程 2 将变量值从 `A` 修改为 `B`,随后又改回了 `A`;
> - 线程 1 执行 CAS 操作,**检查发现变量值仍然为 `A`,故 CAS 操作成功,变量被更新为新值**。
>
> 由于线程 1 不知道变量曾经被修改过,因此可能导致逻辑错误。
>
> 例如,在**基于指针的 "无锁栈**" 中,**指针的 ABA 问题**可能导致错误逻辑:
> 假设初始时栈中节点为 `[Node1<-Node2<-Node3]`,**栈顶指针 `head` 指向 `Node3`**。
>
> - 线程 A 调用 `pop()`,该函数中首先通过`load` **读取栈顶元素值**后,**即将调用 `CAS` 令 head 指向 Node2**。
> - 线程 B 此时**先将 Node3 出栈并释放**,随后又**将一个新节点 Node4 压入**,由于内存复用,**新分配的 ==Node4 地址与之前的 Node3 地址相同==**,此时栈顶指针指向 Node4,栈内元素为 `[Node1<-Node2<-Node4]`。
> - 线程 A 执行 CAS 操作,**判断得到==栈顶的 Node4 地址与预期值 Node3 地址相等==**,认为 "**栈顶节点未变化过**",故**弹出 Node4 并释放其内存**。
>
> 然而,**对线程 B 而言,Node4 是其压入并需要使用的节点,当线程 B 再次访问 Node4 时就将发生错误**。
>
#### 解决 ABA 问题的方法
可引入如下机制,检测变量的修改历史:
1. **使用「版本号」**:**将一个版本号与变量值绑定**,CAS 操作不仅检查变量值,还检查版本号是否匹配;
2. **使用「指针标志位」**:在 64 位系统中,指针通常只使用低 48 位地址,高 16 位可以作为标志位,可以**利用指针的高位存储标志位(tag),每次修改时更新标志位以检测变化**。
<br><br>
# 参考资料
# Footnotes
[^1]: 《UNIX 网络编程》(P176)