# 纲要
> 主干纲要、Hint/线索/路标
- **线程的创建与管理**:创建、启动和管理线程
- **线程间同步**:线程之间如何协作,保护共享数据,避免条件竞争;
- 锁/互斥量(`std::mutex`)
- 条件变量(`std::condition_variable`)
- 异步任务( `std::future<T>`,`std::async()`,`std::promise<T>`,`std::packaged_task<T>`)
- 原子操作(`std::atomic` )
- 信号量(c++20)
- 屏障(c++20)
%%
# Q&A
#### 已明确
#### 待明确
> 当下仍存有的疑惑
**❓<font color="#c0504d"> 有什么问题?</font>**
%%
<br>
# C++ 标准对并发的支持
- C++11 引入了对多线程的支持,是 C++标准**首次提供原生语言层面的多线程支持**(以 Boost 线程库为蓝本)。
- 规定了**内存模型**:引入了线程存储持续性 `thread_local` ;
- 引入了用于**线程管控**、**保护共享数据**、**同步线程间操作**、**底层原子操作**的新类。
- 规定了**原子操作的语义**,提供了全方位的**原子操作库**,能直接单独操作每个位、每个字节。
- C++14 引入了用于**保护共享数据的新互斥**:读写锁,`std::shared_mutex`;
- C++17 增添了一系列适合新手的 **"并行算法"函数**。
> [!quote]
>
> C++标准库的设计目标是 "**高效实现**" 与 "**低抽象损失**" ——假定某些代码采用了标准库所提供的工具/接口,如果将其改为直接使用底层 API,不会带来性能增益,或者收效甚微。
>
<br>
# C++ 多线程&同步 头文件总结
> 参见[^1] [^2] [^3]
在 C++11 之前,编写**多线程程序**只能依赖于:
- **平台特定 API**:POSIX 多线程 C API、 Windows 多线程 API 等
- **第三方库**:例如 Boost 库
C++11 标准引入了**多线程并发以及同步原语**的支持,从此能够**基于 C++线程库本身**进行开发。
| | 引入标准 | | 说明 |
| ---------------------- | ----- | ----------------------- | --------------------------------------------------------------------- |
| `<thread>` | c++11 | 线程库 | 定义了 `thread`, `jthread` 类,以及 `yield()`, `get_id()`, `sleep_for` 等函数 |
| `<mutex>` | c++11 | 互斥原语 | 定义了不同的非共享互斥量,以及 `call_once()` 等。 |
| `<condition_variable>` | c++11 | 条件变量 | 定义了 `condition_variable` 和 `condition_variable_any` 类 |
| `<atomic>` | c++11 | 原子操作 | 定义了原子类型,`atomic<T>` 以及原子操作,用于 "无锁编程"。 |
| `<future>` | c++11 | 异步计算原语 | 定义了 `future`,`promise`,`packaged_task` 和 `async()` 等,用于 "异步任务以及结果获取"。 |
| `<shared_mutex>` | c++14 | 共享互斥原语 <br>(共享锁 & 独占锁) | 定义了 `shared_mutex`, `shared_lock`,`unique_lock` 等,可用作 "读写锁"。 |
| | | | |
| `<semaphore>` | c++20 | 信号量 | 定义了 `binary_semaphore` 以及 `counting_semaphore` 类,二元信号量以及一般信号量。 |
| `<barrier>` | c++20 | 屏障原语 | 定义了 `barrier` 类 |
| `<coroutine>` | c++20 | 协程 | 定义了编写协程所需的所有功能。 |
| `<stop_token>` | c++20 | `std::jthread` 的停止令牌 | 定义了 `stop_token`、`stop_source`、`stop_callback` 等 |
> [!NOTE] 参见 [^3] (附录 D)
> ![[_attachment/02-开发笔记/01-cpp/多线程并发相关/cpp-多线程&同步.assets/IMG-cpp-多线程&同步-95A089FF5923D55E6453FFE2355218D7.png|850]]
<br><br><br>
# C++ 线程库
C++标准库头文件 `<thread>` 中提供的 **`std::thread` 类** 以 "**面向对象**" 的方式提供了对多线程的支持,
将**一个 `std::thread` 实例对象**与一个 "**线程函数**" 相关联,支持以下线程管理:
- **发起线程**;
- **识别线程**;
- **等待 or 分离线程**;
- **线程归属权转移**(从一个 `std:thread` 对象转换到另一个)
**==无法从线程直接返回值==**,但可以运用 `future` 通过别的方式返回结果。
> [!caution] `std::thread` 类本身没有提供获取 "**线程函数返回值**" 的接口
>
> 不同于 POSIX API 中的 `pthread_join(tid, (void**)&res))` 可以获取线程函数返回值。
> 在 C++ 中只能通过**异步任务机制** `std::future` 来获取线程函数返回值。
<br>
## 线程运行
新线程通过**构建 `std::thread` 实例对象而启动**。
**每个 `std::thread` 类实例对象关联一个独立线程**,当**构造一个该类实例**时,即表示**启动/运行一个新线程**。
`std::thread` 的构造函数接受一个 "**可调用对象**" 作为线程的**起始函数**(initial function),
新线程将执行该==**线程函数**==,当函数返回时则线程随之终结。
> [!info] 应用进程本身作为一个**主线程**,其**起始函数是 `main()` 函数**。
##### 使用示例
```cpp title:multi_thread_exam.cpp
#include <iostream>
#include <thread>
#include <unistd.h>
using namespace std;
void ThreadFunc1() {
sleep(2);
std::cout << "Hello from ThreadFunc1\n";
}
void ThreadFunc2() {
std::cout << "Hello from ThreadFunc2\n";
sleep(4);
}
int main() {
// 每个`std::thread`类对象对应一个线程, 以线程起始函数作为参数(参数类型为函数指针).
// 创建两个新进程
std::thread t1(ThreadFunc1); // 创建一个线程变量t1
std::thread t2(ThreadFunc2); // 创建一个线程变量t2
// 主线程中调用线程对象的`.join()`方法, 阻塞并等待该线程结束.
// 等待两个线程完成
t1.join(); // 主线程等待线程对象t完成
t2.join();
return 0;
}
```
### 参数传递
`std::thread` 类内部的参数传递机制类似于 `std::bind()`,其构造函数:
- **首项**参数:
- 接受任何 "**可调用对象**"(函数指针、类成员函数指针、仿函数、lambda、bind、`std::function` 包装等)作为其首项参数,即线**程的 "==入口/起始函数=="**。
- **其余**参数:
- 若首项参数是 "**成员函数指针**",则第二项参数应当是 **"类的实例对象"**;
- 否则,其余参数全都为**传递给 "==线程函数==" 的参数**。
### 获取线程 ID
每个线程都具有唯一的线程 ID,**类型为 `std::thread::id`**,支持**直接比较和拷贝操作**。
(因此也可用作关联容器的键值,或用于排序)
获取线程 ID 的两种方式:
- 通过 `std::thread` 对象的 `get_id()` 方法,可获取**该线程对象==关联线程**==的线程 ID。
- 通过 `<tread>` 头文件中的 `std::this_thread::get_id()` 函数,可获取 "**==当前线程==**" 的线程 ID。
线程 ID 的值也可直接写到输出流(`std::cout`),但其值的内容本身不具备语义层面的意义。
### 等待线程完成 `join()`
`std::thread` 实例的成员函数 `join()` ,表示**将阻塞 "等待"该线程运行完成**。
对于某个确定的线程,其关联的线程对象的 `join()` 方法**只能被调用一次**。
当调用过 `join()` 后,线程对象的 `joinable()` 方法将返回 `false` 。
### 分离线程 `detach()`
`std::thread` 对象的 `detach()` 方法被调用后,将**切断该==对象==与==对应线程==之间的关联**,令**线程独立地在后台运行**。 **分离出的线程将持续运行、直到最终从线程函数返回时终止**,即使 `std::thread` 对象可能早已经被销毁。
被分离出的线程也称 "**==守护线程==**"(daemon thread),其==**归属权==和==控制权==将被转移给 "C++运行时库(Runtime Library)"**,由此保证一旦该线程结束,与之关联的资源都会被正确回收。
> [!caution]
>
> 使用 `detach()` 分离线程后,**如果新线程的函数中持有==指针或引用==,指向 ==旧线程的局部变量==**,则当旧线程结束后,子线程 **试图访问主线程中==已销毁的对象== 将是未定义行为**。
<br><br>
## `std::thread` 对象与 "关联线程" 的生命周期
> [!caution] 在 `std::thread` 对象被销毁前,必须确保已经调用 `.join()` 或 `.detach()` 成员函数
在构建 `std::thread` 对象创建线程后,**必须==在该 `std::thread` 对象被析构前====(例如离开作用域前)调用下列成员函数之一**, 以明确如何处理其 "**关联线程**":
- 1)**调用其 `.join()` 方法,表示==将阻塞 "等待"该线程结束**==;
- 2)**调用其 `.detach()` 方法,表示让==该线程独立运行**==——当 `std::thread` 对象被销毁后,线程仍将继续运行,**直至线程函数返回时才结束线程**。
如果 `std::thread` 对象在将被销毁时仍未执行上述任一操作,则其 **==析构函数==将自动调用 `std::terminate()` 终止整个进程**。
> [!caution] 调用 `join()` 或 `detach()` 前,应确保通过 `.joinable()` 方法检查 "是否有与该对象关联的执行线程"
```cpp
std::thread t1(Func2);
std::thread t2(Func1);
if (t1.joinable()) t1.join();
if (t2.joinable()) t2.detach();
```
<br><br>
## 线程函数的参数传递方式
创建 `std::thread` 对象时,提供给 **"线程函数"** 的**所有参数** 有两种传递方式:
- (1)**默认** 以 "**==按值传递==**" 的方式 "**==拷贝==**"(对左值) 或 "**==移动==**"(对**右值**) 到 **线程的内部存储空间(独立的栈)**,<br>随后,在线程上下文中的这些**参数副本**(为 **==临时对象==,右值**)将被用来**调用==线程函数==**。
- (2)当 **通过 `std::ref()` 或 `std::cref()` 包装时**,则将会进行 "**==引用传递==**" 。 <br>(`std::thread()` 传参的内部机制类似于 `std::bind()`)
> [!caution]
>
> 若**线程函数的形参为 "==左值引用==" 类型**,则向 `std::thread` 构造函数**直接传递 "左值"** 将会报错,
> 原因在于:**传递给 `std::thread` 的参数默认会被 "==拷贝==" 到线程的内部存储空间**,而**拷贝所产生的 "临时对象"是一个==右值==,无法绑定到线程函数期望的==左值引用==上**。
>
> 解决方案:当线程函数期望一个 "**左值引用**" 参数时,传递给 `std::thread` 的**实参**必须 **使用 ` std::ref() ` 包装**,以明确指示进行 "**==引用传递==**" 而非 "值传递"。
> [!quote]
> By default, the arguments are ==copied== into internal sotrage, where they can be accessed by the newly created thread of exectution, and **then passed to the callable object** or function **as ==rvalues== as if they were ==temporaries==**.
> [!example] 示例:向 `std::thread` 传递"引用"
>
> ```cpp title: pass_param_to_thread_func. cpp
> void ThreadFunc1 (int& value) { // 接受一个左值引用
> cout << "This is a left value reference parameter: " << value << endl;
> }
>
> void ThreadFunc2 (int&& value) { // 接受一个右值引用
> cout << "This is a right value reference parameter: " << value << endl;
> }
>
> int main () {
> int x = 99;
> // 传递左值引用
> // std::thread t1(ThreadFunc1, x); // error: 无法将临时副本(右值)绑定到左值引用
> std::thread t1(ThreadFunc1, std::ref(x)); // 需要使用 `std::ref`
> // 传递右值引用
> std::thread t2(ThreadFunc2, x); // 直接传递右值引用
>
> t1.join();
> t2.join();
> }
> ```
>
>
>
> [!caution]
>
> 对于需要"分离"的线程,**应当避免传递 "指针" 作为线程函数的参数**,以免指针的生命周期先结束,从而**导致线程内的指针成为 "悬空指针**"。
>
> ```cpp
> void ThreadFunc (int i, std::string const& s) { // 一个线程函数
> std::cout << "Hello from ThreadFunc1: " << i << " " << s << std::endl;
> }
>
> void oops () {
> char buffer[1024] = "Hello World Thread!";
> // 注意下面两种写法:
> // 1. 直接传递 buffer 指针
> // 2. 先用 buffer 指针实例化一个 std:: string 对象, 再传递该对象
> // 两种写法的区别在于, 第一种写法可能会导致错误, 第二种写法是正确的.
> // 原因在于:
> // 如果直接传递了 buffer (char*指针), C++会首先"拷贝"该指针到线程的内部存储空间中,
> // 然后再在线程的上下文中调用 ThreadFunc 函数. 直到调用线程函数时, char*指针才被隐式转换为 std:: string 对象.
> // 同时, 由于通过 `t.detach()` 分离了线程, 因此在构建新进程后、调用 ThreadFunc 之前, 指针 buffer 的生命周期可能已经结束,
> // 于是将可能导致错误————视图访问已经被销毁的内存并隐式转换为 std:: string 对象.
> // 正确的做法则是方式二, 先显式地由 buffer 创建一个 std:: string 对象, 再将该对象传递给线程函数.
>
> // std:: thread t (ThreadFunc, 3, buffer); // 可能导致错误
> std::thread t(ThreadFunc, 3, std::string(buffer)); // 正确. 避免悬空指针.
> t.detach(); // 分离线程
> }
> ```
>
>
<br><br>
## 线程归属权转移
每个 `std::thread` 实例关联着一个执行线程,**该线程的归属权可在不同 `std::thread` 实例间转移**,
通过 **`std::thread` 对象间的 "==移动==" 操作**(移动构造/移动赋值)实现。
转移归属权之后,**原始 `std::thread` 对象会处于一个有效但"空"的状态**,即它**不再关联任何执行线程**。<br>对于这样的对象,`joinable()` 成员函数将返回 `false`。
> [!NOTE] `std::thread` 类仅支持移动语义,不支持拷贝语义
> [!caution] 线程归属权要求
>
> - 对于**任一特定的执行线程**,任何时候**只有==唯一的 `std::thread` 对象==与之关联**;
> - 对于**一个 `std::thread` 实例对象**,其在任何时刻**只能与一个执行线程关联**。
>
> 如果尝试将**另一个线程的归属权**转移给一个 **==已关联有执行线程==的 `std::thread` 实例对象**,
> 则整个进程将触发 `std::terminate()` 终止。
>
```cpp title:move_thread_ownership.cpp
/** `std::thread`类只支持移动语义, 可通过移动语义实现对"线程归属权"的转移.
* 移动构造函数
* 移动赋值运算符
*
* 注: std::thread 不支持拷贝语义.
*/
int main() {
std::thread t1([](){
cout << "Hello from Lambda" << endl;
});
// 1) 通过`std::thread`类的"移动构造函数"进行线程归属权的转移
std::thread t2(std::move(t1)); // `std::move()`将`std::thread`实例t1变为右值;
// 2) 通过`std::thread`类的"移动赋值"实现线程归属权的转移
std::thread t3;
t3 = std::move(t2);
t3.join();
}
```
> [!example] 需要转移线程归属权的两个常见应用场景
>
> 1. 当前函数创建新线程并置于后台运行,需要将**归属权移交给当前函数的调用者**时;
> 2. 将当前函数下创建的线程的**归属权传入给某个函数调用**,由后者等待该线程结束;
>
> 示例:
>
> ```cpp
> void Func1() { cout << "Hello from Func1\n"; }
> void Func2() { cout << "Hello from Func2\n"; }
>
> void AcceptThread(std::thread t) {
> if (t.joinable()) t.join();
> }
>
> std::thread GetAThead() {
> return std::thread(Func1);
> }
>
>
> int main() {
>
> // GetAThead()返回一个线程对象, 转移其函数内部创建的线程的归属权.
> std::thread t1 = GetAThead();
>
> // 向 AcceptThread 函数传递一个线程对象, 移交线程的归属权给该函数.
> AcceptThread(std::move(t1)); // 通过`std::move()`将`std::thread`实例变为右值;
> AcceptThread(std::thread(Func2)); // 直接向函数传递一个临时对象(右值)
> }
> ```
<br><br>
## 利用 RAII 范式对 `std::thread` 对象进行再封装
> [!tip] 如何保证"新线程"在 "主线程退出前" 结束?
>
>
> 当程序中需要保证在 "新线程" **在主线程退出前结束**时,必须同时考虑出现异常的情况,即:
>
> - 在**主线程正常运行**时,确保会调用 `join()`;
> - 当**主线程出现==异常**==时,确保也会调用 `join()`;
>
> 最简单的方式是使用 `try/catch` 块:
>
> ```cpp
> try {
> // do_something_in_current_thread();
> } catch (...) {
> // 如果在主线程中抛出异常, 则确保子线程的 join()方法被调用.
> t1.join();
> t2.join();
> throw;
> }
> t1.join();
> t2.join();
> ```
>
> 更推荐的,更简洁、有效的实现方式是**利用==标准的 RAII 范式==**,**将主线程的任务逻辑封装为一个类,并在==该类的析构函数==中调用各个子线程的 `join()`**:
>
> ```cpp
> /** 使用 RAII 机制来确保线程的 join()方法被调用
> * ! 设计一个类, 该类的构造函数接受线程对象的"引用", 并作为其成员变量保存.
> * ! 在该类的析构函数中, 调用线程对象的 join()方法, 从而确保子线程的 join()方法被调用.
> */
> class ThreadGuard {
> public:
> explicit ThreadGuard(std::thread& t_) : t(t_) {}
> ~ThreadGuard() {
> // 检查: 可 join 时, 才进行 join
> if (t.joinable()) {
> t.join();
> }
> }
> // 禁止拷贝构造与拷贝赋值
> ThreadGuard(ThreadGuard const&) = delete;
> ThreadGuard& operator=(ThreadGuard const&) = delete;
>
> private:
> std::thread& t; // 子线程对象
> };
>
> void DoSomething1(int &i) { cout << "Hello from DoSomething1: " << i << endl; }
> void DoSomething2(int &i) { cout << "Hello from DoSomething2: " << i << endl; }
>
> int main() {
> int some_local_value = 88;
>
> std::thread t1(DoSomething1, std::ref(some_local_value));
> std::thread t2(DoSomething2, std::ref(some_local_value));
> // 利用 RAII 范式, 构建 ThreadGuard 类对象.
> // 每个线程对象对应一个 ThreadGurad 类实例对象, 确保线程的 join()方法在一定被调用.
> ThreadGuard g(t1);
> ThreadGuard g2(t2);
> }
> ```
>
>
>
<br><br><br>
# C++ 互斥锁
> 由头文件 `<mutex>` 提供
## 总结
C++标准库**头文件 `<mutex>`** 中为 "互斥原语" 提供了下列 API:
- (1)**互斥锁类**:
| 类 | 说明 |
| ----------------------------------------------- | ------------------ |
| `std::mutex` | 最简单的互斥锁,提供基本的互斥功能; |
| `std::recursive_mutex` | 可递归互斥锁 |
| `std::timed_mutex`、`std::recursive_timed_mutex` | 定时锁 |
- (2)**`std::lock()` 函数**——用于**同时安全地锁定两个及以上的互斥锁而不引起死锁**。
- (3)三个**类模版**,作为 "**==RAII 风格==的锁封装**"—— 与互斥锁配合使用,提供 "**基于对象生命周期**" 的锁管理:
| 类模版 | 说明 |
| ------------------------------ | ---------------------------------------------------- |
| `std::lock_guard<T>` | **==构造对象==时自动加锁,==析构==时自动解锁**;一旦锁定,不能手动解锁 |
| `std::unique_lock<T>` | 进一步支持 **"手动解锁" 和 "重新锁定"**,且提供了**延迟锁定、时间锁定、所有权转移**等功能 |
| `std::scoped_lock<...>`(c++17) | 构造时**加锁==多个==互斥量**(内部基于 "死锁避免算法"),析构时**释放所有锁**; |
- (4) **`std::once_flag` 类 & `std::call_once()` 函数**——用于多线程下需要 "**延迟初始化**" 的场景,**确保初始化或其他操作只执行一次**。
> [!caution] **不得向==锁所在的作用域之外==传递==指针和引用==,指向受保护的共享数据**
>
> 包括两种情景:
>
> - **避免向调用者==返回指针或引用==**,指向受保护的共享数据;
> - **避免在成员函数内==调用其他外部函数时传递指针或引用==**,指向受保护的共享数据。
>
> 当存在 "**游离的指针或引用**"(超出锁的作用域) 指向共享数据时,安全性则被破坏。
<br><br>
## 锁类
### `std::mutex` 基本的互斥锁
一个 `std::mutex` 对象代表一个**最基本的互斥锁**,**其在任何时候只能被一个线程锁定**。
当某个线程**调用该对象的 `lock()` 方法加锁**之后,其它线程**再尝试调用此方法加锁则该==线程会被阻塞==,直到锁被释放**。
```cpp
std::mutex mtx; // 一个全局互斥锁
void shared_function() {
mtx.lock(); // 上锁
// 访问或修改共享资源
// ...
mtx.unlock(); // 解锁
}
```
> [!example] 互斥锁使用示例
>
> ```cpp
> /** std::mutex 使用示例
> *
> * print_block 函数由一个全局互斥锁进行控制.
> * 在第一个线程运行print_block函数并调用`mtx.lock()`之后,
> * 当第二个线程运行print_block函数并调用`mtx.lock()`时将会阻塞,
> * 直到第一个线程调用了`mtx.unlock()`解锁.
> */
> std::mutex mtx; // 定义互斥锁
>
> void print_block(int n, char c) {
> mtx.lock(); // 上锁
> for (int i = 0; i < n; ++i) { cout << c; }
> cout << '\n';
> mtx.unlock(); // 解锁
> }
>
> int main() {
> // 由于加了锁, 因此 t1 和 t2 线程的输出不会交叉, 而是分别输出 50 个'*'和 50 个'