%% # 纲要 > 主干纲要、Hint/线索/路标 # Q&A #### 已明确 #### 待明确 > 当下仍存有的疑惑 **❓<font color="#c0504d"> 有什么问题?</font>** # Buffer ## 闪念 > sudden idea ## 候选资料 > Read it later %% # 同步原语 ⭐ 同步原语(synchronization primitive)是用于**协调多个线程间执行顺序、访问共享资源的一组==机制==**, 确保**各线程在特定条件下按照正确的顺序执行**,旨在避免 "**竞态条件**"、"**数据不一致**"、"**死锁**" 等问题。 ### 基本的几种同步原语 - **互斥原语**(Mutual Exclusion Primitives):**防止==多个线程或进程==在同一时刻==访问临界区或共享资源==**。 - **互斥锁**(Mutex):保证同一时刻只有一个线程能进入临界区。 - **自旋锁**(Spinlock):忙等待的锁,用于临界区较短的场景。 - **读写锁**(Reader-Writer Lock):允许多线程同时读,但写操作是互斥的。 - **条件变量**(Condition Variable):用于**线程间的==等待和通知==机制**。 - **信号量**(Semaphore):控制**资源的==并发访问数量==**。 - **屏障**(Barrier):确保**所有线程到达同步点**后再继续。 - **原子操作**(Atomic Operation):不可分割的操作,用于简单的同步需求。 ### "互斥" 与 "同步" - **互斥**:确保**同一时刻只有一个线程能==访问 "共享资源"==(进入临界区)** - 例如:线程 A 与线程 B **不能同时修改一个全局变量** - **同步**:通过 **"无竞争"** 的 "**等待/唤醒**" 机制,协调并发进程/线程间的**执行顺序** - 例如:**线程 B 必须==等待==线程 A 执行完某个操作之后才能执行** > [!NOTE] **互斥原语**是**同步原语的子集** > > 互斥的本质即让多线程 "**==串行化==**",**阻止对共享资源的并发访问**,令各线程**只能先后串行依次访问临界区或共享资源**。 <br><br> # 锁 在多线程并发编程中,**锁**(Lock)是用于**防止多个线程在同一时间访问共享资源的互斥机制**,避免**出现竞态条件和数据不一致**问题。 常见的多线程锁包括: - **乐观锁**:基于乐观假设(并发环境下对共享资源的访问冲突少见),采用 "**冲突检测机制**"(版本号或 CAS 操作),**更新共享数据时不加锁**,仅在 "**提交时**" 进行冲突检测,**若冲突则回滚操作**。 - **悲观锁**:基于悲观假设(并发环境中**对共享资源的访问冲突**频繁常见),因此**每次访问共享资源时都必须==直接加锁互斥==**。 - **互斥锁**(Mutual Exclusion Lock):请求锁失败而 **阻塞等待** 时,线程会 **==释放 CPU==**; - **自旋锁**(Spinlock):加锁失败后,线程会 **==忙等待==**,直到它拿到锁; - **读写锁**(Reader-Writer Lock):"读-读"可并发,"读-写" & "写-写" 互斥。 - **递归锁**(Recursive Mutex):允许同一线程**在递归调用中多次获取该锁**,**而每次加锁都需要有对应的解锁操作**。直到所有锁被释放,其他线程才能获取该锁。 > [!caution] 线程调度由操作系统决定 > > **线程调度**由操作系统的调度器控制, 而调度器的行为可能因为多种因素而有所不同,包括系统当前负载、线程优先级等。 > > **即使 `t1` 线程先被创建和启动,也不能保证其会先于 `t2` 线程获得互斥锁**。 > 即便两个线程几乎同时达到等待锁的时间点,也是**由操作系统来决定哪个线程先获取锁**。 > [!NOTE] 锁不能保证获取顺序!当一把锁释放后,**所有等待该锁的线程共同竞争**,最先申请锁的线程不一定能获取! > > 如果**多个线程均阻塞着等待某个锁**,则在这一锁被释放时,**任何==等待线程==都将有机会竞争锁**,最终谁能获得该锁取决于操作系统的调度。 <br> ## 互斥锁 「**互斥锁**」:最基本的锁,**确保同一时刻==只有一个线程能够访问共享资源==**。当一个线程获得互斥锁后,**其他线程必须等待,直到锁被释放**。 **使用场景**:需要确保**只有一个线程访问共享资源**时。 ![[_attachment/02-开发笔记/05-操作系统/并发相关/同步原语.assets/IMG-同步原语-9A5EC13D2162F0522AE51EAD8FF66D4E.png|476]] ```cpp std::mutex mtx; void thread_function() { mtx.lock(); // 获取锁 // 临界区代码 mtx.unlock(); // 释放锁 } ``` <br> ## 自旋锁 「**自旋锁**」:**当一个线程尝试获取锁时,如果锁已经被占用,==线程会保持活跃、不断地轮询检查锁的状态,直到获取锁为止==,而不会进入睡眠**。 > [!info] "自旋" 与 "忙等待" 的含义 > > - **自旋**(spin)是指线程在等待获取锁的过程中,不进入休眠状态,**保持活跃**(占用 CPU),**持续地反复循环检查锁的状态,直至获取到锁为止**。 > - **忙等待**(busy waiting)—— 持续检查一个变量,直到某个值出现为止。 > [!caution] 在单核 CPU 上,需要抢占式调度器(**即不断通过时钟中断一个线程,运行其他线程**),否则一个自旋线程永远不会主动放弃 CPU **适用场景**:适用于**临界区执行时间非常短的场景**——"**自旋" 比让线程睡眠后再唤醒开销更低的情况**。 ```cpp std::atomic_flag lock = ATOMIC_FLAG_INIT; void thread_function() { while (lock.test_and_set(std::memory_order_acquire)) { // 自旋等待锁释放 } // 临界区代码 lock.clear(std::memory_order_release); // 释放锁 } ``` <br> ## 读写锁 > 读写锁也称 "**共享互斥锁**"(shared-exclusive lock) 「**读写锁**」:**==允许多个线程同时读取==共享资源**,**但==只允许一个线程写入==共享资源**。 **适用场景**:适用于**读多写少**的场景,如缓存、配置文件读取等。 特点如下: - **读可并发**,而 **写-写互斥** - **读-写互斥**:读写互斥,不能同时进行。 - "**==读锁==**" 必须等 "**==写锁==**" 释放后才能获取 - "**==写锁==**" 必须等==所有 "**读锁**"== 释放后才能获取 > [!NOTE] C++中的 `std::shared_mutex` 即用作读写锁 ```cpp std::shared_mutex rw_lock; void read() { std::shared_lock<std::shared_mutex> lock(rw_lock); // 共享锁,多个线程可以读 // 读操作 } void write() { std::unique_lock<std::shared_mutex> lock(rw_lock); // 独占锁,写操作互斥 // 写操作 } ``` <br> ### "读优先锁" 与 "写优先锁" 根据实现的不同,有三两种类型: - **==读优先锁==**:当已有线程持有读锁时,其余**线程可继续获得读锁**,**==写锁==必须等到==所有读锁释放后==才能获取**。 * 特点:**读优先**,但可能**饿死写线程** * 实现思路: 当一个线程: * 1) 申请读锁时, 若**没有其它线程持有读锁**, 则 **请求"读写互斥锁"**; * 2) 释放读锁时, 若其**是最后一个"读锁**", 则**释放"读写互斥锁"**; * 3) 请求写锁时 => **直接竞争** "读写互斥锁" * 4) 释放写锁时 => **直接释放** "读写互斥锁" - **==写优先锁==**:当已有线程持有读锁,而**写者线程==请求写锁被阻塞等待==时,将==阻塞后续所有线程获取读锁==**。 - 特点:**写优先**,但可能**饿死读线程** - 实现思路: - 1) 当一个线程**申请读锁**时, 若存在运行or等待中的写者, 则其阻塞 => 等待获取"读锁". * 2) 当一个线程**释放"最后一个"读锁**时, 唤醒"等待写锁"的线程. * 3) 当一个线程**申请写锁**时, 若存在运行中的读者或者其他写者, 则阻塞 => "等待获取"写锁" * 4) 当一个线程**释放写锁**时, "**==优先"唤醒"等待写锁的线程==**", 其次唤醒所有 "等待读锁的线程". - **==公平读写锁==**:用队列把**获取锁的线程排队**,不管是**写线程还是读线程都按照==先进先出的原则==加锁**,读线程仍然可以并发,也不会出现「饥饿」的现象。 ![[_attachment/02-开发笔记/05-操作系统/并发相关/同步原语.assets/IMG-同步原语-419EB0FDEB255857B45B6A18F0282BD1.png|1016]] <br> ## 递归锁 「**递归锁**」:允许同一线程**在递归调用中多次获取该锁**而不会发生死锁/阻塞,**每次加锁都需要有对应的解锁操作**。直到所有锁被释放,其他线程才能获取该锁。 适用场景:在**递归函数或多个方法调用**中,某个线程需要**多次进入同一个临界区**。 ```cpp std::recursive_mutex rmtx; void recursive_function(int depth) { rmtx.lock(); if (depth > 0) { recursive_function(depth - 1); } rmtx.unlock(); } ``` <br><br><br> # 条件变量 > ❓<font color="#c00000">什么是条件变量?</font> 条件变量:用于 "**线程之间==等待和通知==**" 的同步机制,**使线程在等待某个条件成立时==挂起进入休眠状态==**,而**在条件满足时被唤醒继续执行**,需要与 **==互斥锁==** 一起使用。 使用场景:**生产者-消费者**模型、**任务队列**。 > [!note] 条件变量机制下,线程在==等待条件成立时休眠,不占用 CPU==,直到被唤醒。 > > 如果不使用条件变量,也可以采用 "**自旋**" 的方式,例如在 `while(cond == False)` 循环里持续检查直至 `cond` 为 `true`,但自旋会使得**该线程浪费 CPU 时间**。 <br> ## 条件变量机制 > ❓<font color="#c00000">条件变量的用法是什么?</font> 要素: - **互斥锁**:用于**保护对 "条件" 的访问** - **条件状态**:等待线程**所等待成立的特定条件**,例如某个共享变量的值是否为特定值,或某个谓词函数是否为真。 - **条件变量**:用于提供 "**等待/唤醒**" 这一线程间同步机制的 **API**,其标识了一个 "**等待队列**"; 1. 在 "条件" 不满足时,**让线程阻塞等待**; 2. 在 "条件" 满足时,**唤醒等待中的线程**。 两过程: - **等待**:当一个线程**检查发现条件不满足**时,**==进入休眠==状态等待**(如调用 `pthread_cond_wait(&cv, &mutex)`)。 - 阻塞等待期间,该线程会 **==释放与条件变量相关的互斥锁==**,从而允许其他线程获取锁并**修改共享数据**。 - 被唤醒后(从`pthread_cond_wait()` 返回),**该线程==重新获得锁==**。 - **通知**:当某个线程**改变了共享数据的状态**后,通过条件变量来 **==通知/唤醒==** 等待的线程 - 如调用`pthread_cond_signal(&cond)` 或 `pthread_cond_broadcast(&cond)` 进行唤醒 > [!quote] > ![[_attachment/02-开发笔记/05-操作系统/并发相关/同步原语.assets/IMG-同步原语-1CB023178E624FD62F998041AFBFC439.png|442]] <br> ## 注意事项 🚨 > [!caution] **使用条件变量进行 "等待&通知"(调用 `wait` 与 `signal` ) 时都必须==持有互斥锁==** > > - 对于 "**进行等待**" 的线程,要保证 "**检查条件**" 的操作是原子性的。 > - 对于 "**进行通知**" 的线程,要保证 "**改变共享数据**" 的操作是原子性的。 > [!caution] 阻塞等待的线程可能被 **==虚假唤醒==**!必须使用 `while()` 来**检查条件**,而不是 `if()`, > > 等待线程可能受 OS 调度影响而被 "**唤醒**",即被唤醒时条件并不一定满足,因此**必须外套 `while` 在其被唤醒后再次判断**。 > > 例如在生产者-消费者模式中,若存在多个消费者,则**某个消费者被唤醒时**,可能缓冲区中数据已被其他消费者用掉! > > 示例: > > ```c > while (!ready) { // 防止虚假唤醒——唤醒后再次进行条件判断 > pthread_cond_wait(&cond, &mutex); > } > // 条件满足后继续执行 > ``` > > ^6hqcq5 <br> ## 使用示例 #### C++ API 示例 ```cpp std::mutex mtx; std::condition_variable cv; bool ready = false; void wait_thread() { std::unique_lock<std::mutex> lock(mtx); cv.wait(lock, []{ return ready; }); // 等待ready为true // 临界区代码 } void signal_thread() { std::lock_guard<std::mutex> lock(mtx); ready = true; cv.notify_one(); // 唤醒等待的线程 } ``` #### POSIX API 示例 ```c #include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <unistd.h> #include <assert.h> int done = 0; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 通知线程 void* thr_exit(void* arg) { int rc; rc = pthread_mutex_lock(&mutex); assert(rc == 0); done = 1; rc = pthread_cond_signal(&cond); assert(rc == 0); rc = pthread_mutex_unlock(&mutex); assert(rc == 0); printf("thr_exit() has been called.\n"); return NULL; } // 等待线程 void* thr_join(void* arg) { int rc; rc = pthread_mutex_lock(&mutex); assert(rc == 0); // 检查共享变量done, 需要加锁. while (done == 0) { // 当done为0时, 线程进入休眠等待状态, 释放互斥锁, 等待被唤醒. rc = pthread_cond_wait(&cond, &mutex); assert(rc == 0); } rc = pthread_mutex_unlock(&mutex); assert(rc == 0); printf("thr_join() has been called.\n"); return NULL; } int main() { pthread_t p1, p2; int rc; rc = pthread_create(&p1, NULL, thr_exit, NULL); assert(rc == 0); rc = pthread_create(&p2, NULL, thr_join, NULL); assert(rc == 0); pthread_join(p1, NULL); pthread_join(p2, NULL); pthread_mutex_destroy(&mutex); return 0; } ``` <br><br><br> # 信号量 > ❓ <font color="#c00000">什么是信号量?</font> **信号量**(Semaphore)是用于**控制访问共享资源的==并发线程或进程数量==** 的同步机制。 信号量可视作一个 "**==计数器==**": - **值为正**时:表示 "**==可用的共享资源数量==**"'; - **值为负**时,表示 "**==阻塞等待的线程/进程个数==**"。 ## 信号量的机制 信号量提供了两个基本的**原子操作**:**==P 操作==**、**==V 操作==** - **P 操作**:**递减信号量** & **==阻塞等待==** (API示例:`sem_wait()`) - 信号量计数**减 1**,表示**获取了一个资源**。 - 若信号量减 1 后为负,则**该线程/进程进入==休眠等待状态==**(进入**等待队列**),等待该信号量 `+1` 后被唤醒。 - **V 操作**:**增加信号量** & **==通知==** (API 示例:`sem_post()`) - 信号量计数**加 1**,表示**释放了一个资源**。 - 若有线程/进程在**该信号量对应的等待队列**中,则 **==唤醒一个等待线程/进程==**,使其继续执行。 进程/线程**访问一个共享资源**前,需执行 **P 操作** 检查是否有可用的共享资源,若有则减少信号量值,若无则阻塞。 进程/线程**不再占有一个共享资源**时,需执行 **V 操作** 增加信号量值,并唤醒一个正在等待资源的线程。 > [!Info] 信号量的实现伪代码(OS 确保 P、V 操作为 "==**原子操作**==" ) > ![[_attachment/02-开发笔记/05-操作系统/并发相关/同步原语.assets/IMG-同步原语-4E3EA2D608E5F0FEA64DDF5E8867421C.png|370]] > <br> ## 信号量的用途 根据信号量 "**初始值**" 的不同,有两种使用场景: - **二值信号量**(Binary Semaphore) - 其计数器只能为 "**0 或 1**",可用作 **互斥锁**(初值为 1)、**条件变量**(初值为 0); - **计数信号量**(Counting Semaphore) - 其计数器为 "**任意非负整数**",用于 **==控制同时访问某一共享资源的线程/进程数量==**。 ## 使用示例 #### 用作互斥锁 **信号量==初始化值为 1==**,调用 `sem_wait()` **将计数降为 0 相当于获取锁**。 > [!NOTE] 信号量用作 "互斥锁" [^1] > ![[_attachment/02-开发笔记/05-操作系统/并发相关/同步原语.assets/IMG-同步原语-C4AE9A7B84F4A4887A61727CC74084F4.png|559]] ```c sem_t m; sem_init(&m, 0, X); // initialzie semaphore to X; what should X be? sem_wait(&m); // critical section here sem_post(&m); // 销毁 sem_destroy(&sem); ``` #### 用作条件变量 > [!NOTE] 信号量用作 "条件变量" [^1] > > ![[_attachment/02-开发笔记/05-操作系统/并发相关/同步原语.assets/IMG-同步原语-D18B4D509084B42C6F1B3AE0C1DE0CCB.png|476]] > **信号量==初始化值为 0==**,线程 A **调用 `sem_wait()` 时进入等待**,**等到其他线程 B 调用 `sem_post` 将值+1 时进行唤醒**。 ```c #include <semaphore.h> #include <pthread.h> #include <stdio.h> #include <assert.h> // 信号量用作条件变量 sem_t s; void* thread_func(void* arg) { printf("thread run\n"); sem_post(&s); // 信号量加1 return NULL; } int main() { sem_init(&s, 0, 0); // 信号量s用作条件变量, 初始化为0. printf("main:begin\n"); // 线程函数. pthread_t p; int rc = pthread_create(&p, NULL, thread_func, NULL); assert(rc == 0); sem_wait(&s); // 信号量减1, 主线程等待被子线程唤醒. 这里效果等价于pthread_join(). printf("main:end\n"); sem_destroy(&sem); } ``` <br><br><br> # 原子操作 「**原子操作**」是指 "**==不可分割==的线程安全操作**",即对任意线程可见而言,**该操作要么完全完成,要么完全没做**。 原子操作一旦执行,直至结束,**期间不会被其他线程或进程打断**,故支持在多线程并发环境下 **==无锁地==对共享数据进行安全修改**。 > [!NOTE] "原子操作" 的优点 > > 实际上,采用 "**互斥保护**" 就可以将多个操作合并,实现原子化。 > **"原子操作" 的优点在于其内部由 "==原子指令==" (硬件层面的 CPU 指令)直接实现**,无需锁,故**省略了申请/释放锁所带来的开销**。 > [!NOTE] 原子操作是实现 **无锁数据结构(Lock-Free Data Structures)** 的基础 > [!info] C11 标准中引入的头文件 `<stdatomic.h>`, C++11 标准引入的 `<atomic>` 头文件中均提供了**一套线程安全的原子操作 API**。 > 参见 [[02-开发笔记/01-cpp/多线程并发相关/cpp-多线程&同步#C++ 原子操作|cpp-多线程&同步: C++原子操作]] > [!caution] 非原子操作可能导致的数据错杂 > > 假设对 C/C++ 中的一个**大型结构体(包含多个数据成员)赋值**,**若该赋值操作是无锁的**,由于其**并非 "原子操作"**,因此可能在**线程 A 赋值进行到一半时**,**另一个线程读取结构体值**,就得到了 "**只有部分数据成员被赋予新值**" 的数据混乱结果。 <br><br> ## CAS (比较并交换) **比较并交换**(Compare-And-Swap,**==CAS==**)是一种**原子操作**,用以**在变量等于期望值时,将其更新为指定的新值**。 > [!note] CAS 的功能: **比较变量的==当前值与预期值==,如果它们相等,则将数据==更新为新的值==**,否则什么都不做。 - 优点:**无锁开销,无线程上下文切换的开销,无阻塞**,可提高并发性能。 - 缺点: - **==ABA问题==**:某些场景下,**变量可能先从A变为B,随后又变回A**,而 **CAS 操作无法区分这种情况**,可能会误认为数据没有变化。 - **高自旋成本**:如果CAS操作频繁失败,导致线程不断自旋重试,可能会消耗大量的CPU资源 ### CAS 功能 > [!info] C++标准库中的 `std::atomic` 提供了 `compare_exchange_strong` 与 `compare_exchange_weak` 两种 CAS 原子操作函数 ```cpp // CompareAndSwap的伪代码 int CompareAndSwap(int* address, int& expected, int new_value) { int actual = *address; if (actual == expected) { *address = new_value; } else { expected = actual; } return actual; // 无论指定内存地址的当前值是否等于预期值, 都返回其旧值. } ``` ### ABA 问题 「ABA 问题」:CAS 操作**只能检查变量值是否与期望值一致**,但 **==无法检测变量值在此期间是否发生过变化==**——**是否被修改后又改回了原值**。 > [!example] ABA 问题说明 > > - 一个变量初始值为`A`,线程 1 读取 `A` 之后作为期望值,并准备**稍后执行 CAS 操作**。 > - 另一个线程 2 将变量值从 `A` 修改为 `B`,随后又改回了 `A`; > - 线程 1 执行 CAS 操作,**检查发现变量值仍然为 `A`,故 CAS 操作成功,变量被更新为新值**。 > > 由于线程 1 不知道变量曾经被修改过,因此可能导致逻辑错误。 > > 例如,在**基于指针的 "无锁栈**" 中,**指针的 ABA 问题**可能导致错误逻辑: > 假设初始时栈中节点为 `[Node1<-Node2<-Node3]`,**栈顶指针 `head` 指向 `Node3`**。 > > - 线程 A 调用 `pop()`,该函数中首先通过`load` **读取栈顶元素值**后,**即将调用 `CAS` 令 head 指向 Node2**。 > - 线程 B 此时**先将 Node3 出栈并释放**,随后又**将一个新节点 Node4 压入**,由于内存复用,**新分配的 ==Node4 地址与之前的 Node3 地址相同==**,此时栈顶指针指向 Node4,栈内元素为 `[Node1<-Node2<-Node4]`。 > - 线程 A 执行 CAS 操作,**判断得到==栈顶的 Node4 地址与预期值 Node3 地址相等==**,认为 "**栈顶节点未变化过**",故**弹出 Node4 并释放其内存**。 > > 然而,**对线程 B 而言,Node4 是其压入并需要使用的节点,当线程 B 再次访问 Node4 时就将发生错误**。 > #### 解决 ABA 问题的方法 可引入如下机制,检测变量的修改历史: 1. **使用「版本号」**:**将一个版本号与变量值绑定**,CAS 操作不仅检查变量值,还检查版本号是否匹配; 2. **使用「指针标志位」**:在 64 位系统中,指针通常只使用低 48 位地址,高 16 位可以作为标志位,可以**利用指针的高位存储标志位(tag),每次修改时更新标志位以检测变化**。 <br><br> # 参考资料 # Footnotes [^1]: 《UNIX 网络编程》(P176)