并发编程
多个任务的执行方式
- 顺序:一个接着一个
- 并行:同一时间执行多个任务(并行是并发的子集)
- 并发:多个任务在同一个重叠时间段执行
OpenMP
使用#prgma预处理指令可以将代码转变为并行处理
此时需要在编译时添加参数-fopenmp
e.g. 计算\(\pi\)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include<iostream>
using namespace std;
double cal(const long num_steps){
double step = 1.0/num_steps;
double sum = 0.0;
#pragma omp parallel for reduction(+:sum)
for (long i = 0; i<num_steps;i++){
double x = (i+0.5)*step;
sum+=4.0/(1.0+x*x);
}
return sum*step;
}
int main(){
cout << cal(10);
}其中第六行语句的意思:
omp parallel for表示将for循环每次循环转化为并行计算reduction表示用于reduce计算的变量与运算符局部变量x在每个线程中修改,不影响其他线程
sum是共享对象,不能在并行线程中直接修改,否则导致数据竞争
sum需要reduce操作,其中reduction表明了需要reduce操作的变量与运算符
STL并行算法库
使用<algorithm>与<numeric>库中的函数,可以简单实现并行
在支持并行的算法函数中,首个参数可以选择执行策略:
std::execution::sequenced_policy:顺序执行策略(实例:seq)std::execution::parallel_policy:并行执行策略(实例:par)std::execution::parallel_unsequenced_policy:多线程加向量化策略(实例:par_seq)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include <algorithm>
#include <iostream>
#include <execution>
#include <vector>
int main(){
auto op = [](int v){std::cout << v << ";\n";};
std::vector<int> numbers{1,2,3,4,5,6,7,8,9};
std::cout <<"sequential for_each"<<std::endl;
for_each(std::execution::seq, numbers.begin(), numbers.end(), op);
std::cout <<"parallel for_each"<<std::endl;
for_each(std::execution::par, numbers.begin(), numbers.end(), op);
std::cout <<"parallel unseq for_each"<<std::endl;
for_each(std::execution::par_unseq, numbers.begin(), numbers.end(), op);
}以上没有对cout进行同步措施,会导致输出错误
线程
线程与进程
进程:程序执行过程,相互独立,操作系统为每一个进程分配一个执行控制块来控制进程
线程:进程中的执行过程,系统为每一个线程分配一个栈于线程控制块,线程间共享堆与全局数据
创建线程
使用<thread>类创建线程
thread(F&& f, Args&&... args);
- 第一个参数为线程的入口函数(线程函数),可以是各种可调用的对象
- 后面的Args为传递给可调用对象的实参
- 若第一个参数为非静态成员函数,则后面的第一个参数为调用这个成员函数的类对象地址
thread的返回值会被忽略,故要使用按引用传递的参数,或存储在类对象的数据成员中
用该函数初始化后会创建一个新线程,执行线程函数
线程只能移动构造或移动赋值,防止多个线程对象代表一个执行线程
.join()
该函数会使得当前线程执行结束后等待其他线程回会合,避免出现提前结束导致出现错误
这种等待状态称为阻塞,若对应线程提前结束,则join函数也会直接返回
调用函数时,该线程必须是joinable的,意味着该线程必须是正在执行或执行完毕但未合并的线程,可以使用.joinable()判断是否可合并
不能被合并的情况:
- 默认构造函数初始化的
- join()完成的
- detach()后的
- 被移动的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include<thread>
#include<iostream>
#include<array>
void greeting(int idx){
printf("Hello from thread %d\n", idx);
}
int main(){
std::array<std::thread, 10> workers;
for (int i = 0;i<10;i++){
workers[i] = std::thread(greeting, i);
}
for (auto &workers:workers){
workers.join();
}
}1
2
3
4
5
6
7
8
9
10
Hello from thread 0
Hello from thread 1
Hello from thread 2
Hello from thread 3
Hello from thread 4
Hello from thread 5
Hello from thread 6
Hello from thread 7
Hello from thread 8
Hello from thread 9Fork and join
可以使用线程实现并行
通过Fork and join模式:把要执行的数据分成n份,由n个线程并行处理自己的那部分数据,再进行汇总,n可以根据cpu核心设置
实现简单,但对于复杂计算,相同数据处理时间不同,导致有些线程很早处理完毕,有的需要更长时间结束
实现transform_reduce函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include
#include
#include
#include
template
void transform_reduce(Iter first, Iter last, UnaryOp transform, BinaryOp reduce, Result& res){
for (auto it=first;it numbers;
numbers.reserve(N);
for (int i = 0; i < N; ++i) {
numbers.push_back((double)i/N);
}
auto transform = [](double x) { return x * x; };
auto reduce = [](double x, double y) { return x + y; };
const int N_thraeds = 8;
std::vector workers;
std::array subResults={};
for (int i = 0; i::iterator,
decltype(transform),
decltype(reduce),double>,
low, high, transform, reduce, std::ref(subResults[i])
)
);
}
double result = 0;
for (int i = 0; i < N_thraeds; ++i) {
workers[i].join();
result = reduce(result,subResults[i]);
}
std::cout << "Result: " << result << std::endl;
return 0;
} Divide and conquer
使用分治将一个任务分成两个小任务,由两个线程完成,然后小任务再分成两个小任务,创建两个线程完成,直到不能分解
另外需要设计一个阈值,当线程过多时就不再创建,而是使用同步计算
为了让各个线程任务饱满,可以采用一半采用同步调用,一半分给另一个线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include<thread>
#include<vector>
#include<array>
#include<iostream>
template<typename Iter, typename UnaryOp, typename BinaryOp, typename Result, std::size_t MinDist = 10>
void transform_reduce(Iter first, Iter last, UnaryOp transform, BinaryOp reduce, Result& res){
std::size_t distance = std::distance(first, last);
if (distance==1){
res = reduce(res, transform(*first));
}
else{
std::size_t half = distance / 2;
Iter middle = first+half;
Result r_result = 0;
Result l_result = 0;
std::thread l_thread(
transform_reduce<Iter, UnaryOp, BinaryOp, Result>,
first, middle, transform, reduce, std::ref(l_result)
);
std::thread r_thread(
transform_reduce<Iter, UnaryOp, BinaryOp, Result>,
middle, last, transform, reduce, std::ref(r_result)
);
l_thread.join();
r_thread.join();
res = reduce(res, reduce(l_result, r_result));
}
}promise & future
这两个类提供了线程之间简单的通信机制
通过promise来设置值或异常,通过future来获得值或异常
一对promise&future共享一个shared state(共享状态),是一个类对象,提供了在保证线程安全的情况下,设置和获取状态与结果的方法,使用shared_ptr指向该状态
常用于发送-接受模式中
promise
promise类模板提供了按值、按引用传递数据,以及仅传递信号三种版本
对应的成员函数:
- promise():构造函数,创建一个共享状态
- promise(promise&& x):移动构造函数,将被移动对象x的共享状态转移过去
- get_future():创建一个future对象,使用promise中的共享状态,只能调用一次
- set_value():设置共享状态中的值,并设置为就绪状态
- set_exception():设置共享状态的异常
注:为了保证一对一性,promise类删除了拷贝构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include<future>
#include<iostream>
void compute_pi(const long num_steps, std::promise<double>&& promise){
double step = 1.0/ num_steps;
double sum = 0.0;
for(long i = 0; i < num_steps; ++i){
double x = (i + 0.5) * step;
sum += 4.0 / (1.0 + x * x);
}
promise.set_value(sum * step);
}
void display(std::future<double>&& receiver)
{
double pi = receiver.get();
std::cout << "Computed value of Pi: " << pi << std::endl;
}
int main(){
const int N_steps = 100000000;
std::promise<double> promise;
auto receiver = promise.get_future(); // 调用获得future对象后就没什么用了
std::thread th1(compute_pi, N_steps, std::move(promise)); // 将promose对象转换为xvalue,作为实参换入
std::thread th2(display, std::move(receiver));
th1.join();
th2.join();
}future
future中的方法主要用于获取状态与结果
- get():获得结果,若对应结果没有处在就绪状态,那么这个调用会阻塞直到就绪
- wait():等待状态就绪,之后返回,此时调用get可以直接获得结果
- wait_for():等待指定时长,返回状态码
- wait_until():等待到指定时刻,返回状态码
- share():
状态码包括:
- ready:共享状态就绪
- timeout:超时
- deferred:共享状态中有延缓执行的函数(显式请求中才会调用的函数)
若需要让多个线程获得结果,需要shared_future,构建方法如下:
- 定义对象,使用promise的get_future()作为初始化参数
- 在原有future上调用方法.share(),此时原有不再使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include<future>
#include<iostream>
#include<sstream>
void compute_pi(const long num_steps, std::promise<double>&& promise){
double step = 1.0/ num_steps;
double sum = 0.0;
for(long i = 0; i < num_steps; ++i){
double x = (i + 0.5) * step;
sum += 4.0 / (1.0 + x * x);
}
promise.set_value(sum * step);
}
void display(std::shared_future<double> reveiver){
static std::mutex mu;
double pi = reveiver.get();
printf("Computed value of Pi: %f\n", pi);
}
int main(){
const int N_steps = 100000000;
std::promise<double> promise;
std::shared_future<double> shared_receiver(promise.get_future());
std::thread th1(compute_pi, N_steps, std::move(promise));
// 使用shared_future可以让多个线程共享同一个future对象
std::thread th2(display, shared_receiver);
std::thread th3(display, shared_receiver);
th1.join();
th2.join();
th3.join();
}互斥和锁
多线程中,当多个线程队同一个对象进行访问,并且至少有一个在写数据时,会产生数据竞争
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include<array>
#include<thread>
#include<iostream>
void inc(int &res){
for(int i = 0; i < 1000000; ++i){
res++;
}
}
int main(){
int counter = 0;
std::thread th1(inc, std::ref(counter));
std::thread th2(inc, std::ref(counter));
th1.join();
th2.join();
std::cout << "Counter: " << counter << std::endl;
}该程序创建了两个线程进行累加,但最后得到的结果都不是正确结果,且每次都不一样

这是一个正常的执行过程

若顺序是这样结果就不对了
互斥量
C++中所有运算默认为非原子操作,即该线程操作随时会被终端或被其他线程访问
mutex提供了一种同步机制,实现多个线程的安全访问
C++11开始,提供了std::mutex来实现互斥原语,是最常用的同步原语之一
有三个主要函数:
- lock():锁定互斥量,即获得互斥量,若已经被其他线程锁定,则会进入阻塞直到被解锁
- try_lock():尝试锁定互斥量,若成功锁定则返回true,否则返回false(不保证被其他线程锁定)
- unlock():解锁,释放互斥量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include<array>
#include<thread>
#include<iostream>
#include<mutex>
class counter{
public:
void inc(){
// 对要修改的资源进行保护
// 保证同一时间只有一个线程可以修改资源
// 即要么获得互斥量的锁,要么等待
counter_mutex.lock();
m_count++; // 临界区
counter_mutex.unlock();
}
int get(){
int temp;
counter_mutex.lock();
temp = m_count; // 临界区
counter_mutex.unlock();
return temp;
}
private:
int m_count = 0;
std::mutex counter_mutex;
};
int main(){
counter c;
auto increase = [](counter &c){
for(int i = 0; i < 1000000; ++i){
c.inc();
}
};
std::thread th1(increase, std::ref(c));
std::thread th2(increase, std::ref(c));
th1.join();
th2.join();
std::cout << "Counter: " << c.get() << std::endl;
}其中,被mutex保护的操作区域叫做临界区
为了避免未释放互斥量的问题,我们通常结合以下对象使用
- lock_guard():可以通过调用上锁后自动解锁,防止忘记解锁以及出现异常离开作用域时没有解锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class counter{
public:
void inc(){
std::lock_guard lock(counter_mutex);
m_count++; // 临界区
}
int get(){
std::lock_guard lock(counter_mutex);
return m_count;
}
private:
int m_count = 0;
std::mutex counter_mutex;
};- unique_lock():更加灵活,可以随时指定锁定与解锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include<array>
#include<thread>
#include<iostream>
#include<mutex>
class counter{
public:
void inc(int n){
std::unique_lock lock(counter_mutex, std::defer_lock);
while(n--){
lock.lock(); // 对要修改的资源进行保护
m_count++; // 临界区
lock.unlock();
}
}
int get(){
std::unique_lock lock(counter_mutex);
return m_count;
}
private:
int m_count = 0;
std::mutex counter_mutex;
};
int main(){
counter c;
auto increase = [](counter &c){
c.inc(1000000);
};
std::thread th1(increase, std::ref(c));
std::thread th2(increase, std::ref(c));
th1.join();
th2.join();
std::cout << "Counter: " << c.get() << std::endl;
}- timed_mutex:有等待时长的互斥
- try_lock_for():指定时长内获得mutex的所有权
- try_lock_until():指定时刻前获得mutex的所有权
返回true表明获得mutex,否则返回false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include<mutex>
#include<thread>
#include<iostream>
#include<chrono>
#include<sstream>
using namespace std::chrono_literals;
class TryDemo{
public:
void print(){
for (int i = 0; i<10; i++){
std::unique_lock lock(m_mutex, std::defer_lock);
if (lock.try_lock_for(100ms)) {
{
std::lock_guard guard(cout_mutex);
std::cout <<"["<<std::this_thread::get_id()<<"] "
<< "Got the lock, printing " << i << std::endl;
}
std::this_thread::sleep_for(100ms);
} // 尝试获取锁,
else{
std::lock_guard guard(cout_mutex);
std::cout <<"["<<std::this_thread::get_id()<<"] "
<< "Failed to get the lock, skipping " << i << std::endl;
std::this_thread::sleep_for(100ms);
}
}
}
private:
std::timed_mutex m_mutex;
std::mutex cout_mutex;
int m_count = 0;
};
int main(){
TryDemo demo;
auto print = [](TryDemo &demo){
demo.print();
};
std::thread th1(print, std::ref(demo));
std::thread th2(print, std::ref(demo));
th1.join();
th2.join();
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[2] Got the lock, printing 0
[3] Failed to get the lock, skipping 0
[2] Got the lock, printing 1
[3] Failed to get the lock, skipping 1
[2] Got the lock, printing 2
[3] Failed to get the lock, skipping 2
[2] Got the lock, printing 3
[3] Failed to get the lock, skipping 3
[2] Got the lock, printing 4
[3] Failed to get the lock, skipping 4
[2] Got the lock, printing 5
[2] Got the lock, printing 6
[3] Failed to get the lock, skipping 5
[3] Got the lock, printing 6
[3] Got the lock, printing 7
[2] Failed to get the lock, skipping 7
[2] Got the lock, printing 8
[3] Failed to get the lock, skipping 8
[2] Got the lock, printing 9
[3] Failed to get the lock, skipping 9死锁
单一线程中多次锁定同一个互斥量会导致死锁
常见死锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include
#include
#include
class dosome{
public:
int f1(){
std::lock_guard lock(mtx);
count*=2;
f2();
return count;
}
void f2(){
std::lock_guard(mtx); // 已经在外层被锁定过了
count++;
}
private:
std::mutex mtx;
int count=0;
}; 此时所在线程会进入阻塞状态
我们使用`recursive的mutx优化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include<iostream>
#include<mutex>
#include<thread>
class dosome{
public:
int f1(){
std::lock_guard lock(mtx);
count*=2;
f2();
return count;
}
void f2(){
std::lock_guard(mtx); // 已经在外层被锁定过了
count++;
}
private:
std::recursive_mutex mtx;
int count=0;
};当被锁定并获得互斥量后,当前线程再调用lock后仍会成功而不会阻塞,直到调用相同数量的unlock()才会被释放
多线程中
多线程中,导致死锁原因较多
- 锁定一个互斥量后没有释放,导致后面的线程获取该互斥量时进入阻塞状态
解决方法:使用RAII,即使用lock_guard等自动对象,在离开作用域后自动释放互斥量
- 同时对多个互斥量锁定解锁时,获得和释放顺序不同导致死锁
如以下代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include<iostream>
#include<mutex>
#include<thread>
#include <chrono>
using namespace std::chrono_literals;
class dosome{
public:
void f1(){
{
std::lock_guard lock1(mtx1);
std::lock_guard lock2(cout_mtx);
count++;
std::cout << "count: "<<count<<std::endl;
}
std::this_thread::sleep_for(1ms);
}
void f2(){
{
std::lock_guard lock1(cout_mtx);
std::lock_guard lock2(mtx1);
count--;
std::cout << "count: "<<count<<std::endl;
}
std::this_thread::sleep_for(1ms);
}
void calc(int n){
for (int i = 0;i<n;i++){
if (n%2){
f2();
}
else{
f1();
}
}
}
private:
std::mutex mtx1;
std::mutex cout_mtx;
int count=0;
};
int main(){
dosome d;
const int N1 = 10000;
const int N2 = 10001;
std::thread th1(dosome::calc, &d, N1);
std::thread th2(dosome::calc, &d, N2);
th1.join();
th2.join();
}这里f1与f2都对两个互斥量进行锁定,顺序不同
主函数设置两个线程分别为10000次,执行f1,10001次,执行f2
此时运行程序会在一定时间发生死锁:
线程1在某个时刻获得了mtx1,线程2在下一时刻获得了cout_mtx
接下来线程1需要获得cout_mtx,但已被2占有,需要进入阻塞等待资源释放
同时线程2需要获得mtx1,但已被1占有,需要进入阻塞等待资源释放
故整个程序进入阻塞,无法执行
活锁
对于上述问题,解决方法之一是使用timed_mutex+try_lock_for
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#include<iostream>
#include<mutex>
#include<thread>
#include <chrono>
using namespace std::chrono_literals;
class dosome{
public:
void f1(){
while(1){
std::unique_lock lock1(mtx1, std::defer_lock);
std::unique_lock lock2(cout_mtx, std::defer_lock);
if (!lock1.try_lock_for(100ms)) continue;
if (!lock2.try_lock_for(100ms)) continue;
count++;
std::cout << "count: "<<count<<std::endl;
break;
}
}
void f2(){
while(1){
std::unique_lock lock1(cout_mtx, std::defer_lock);
std::unique_lock lock2(mtx1, std::defer_lock);
if (!lock1.try_lock_for(100ms)) continue;
if (!lock2.try_lock_for(100ms)) continue;
count--;
std::cout << "count: "<<count<<std::endl;
break;
}
}
void calc(int n){
for (int i = 0;i<n;i++){
if (n%2){
f2();
}
else{
f1();
}
}
}
private:
std::timed_mutex mtx1;
std::timed_mutex cout_mtx;
int count=0;
};
int main(){
dosome d;
const int N1 = 10000;
const int N2 = 10001;
std::thread th1(dosome::calc, &d, N1);
std::thread th2(dosome::calc, &d, N2);
th1.join();
th2.join();
}但是运行起来会出现一顿一顿的情况
此时程序进入了一种活锁的状态,即
线程1锁定mtx1,线程2锁定cout_mtx
线程1试图锁定cout_mtx1失败,进入阻塞,超时后释放mtx1进入下个循环,
线程2试图锁定mtx1失败,进入阻塞,超时后释放cout_mtx进入下个循环
以此类推
防止该情况的发生,需要注意调用次序,注意保证按照顺序获得互斥量
更加方便的方法是使用std::lock()函数或scope_lock类统一管理多个互斥量
lock函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include<iostream>
#include<mutex>
#include<thread>
#include <chrono>
using namespace std::chrono_literals;
class dosome{
public:
void f1(){
{
std::lock(mtx1, cout_mtx);
std::lock_guard lock1(mtx1, std::adopt_lock);
std::lock_guard lock2(cout_mtx, std::adopt_lock);
count++;
std::cout << "count: "<<count<<std::endl;
}
}
void f2(){
{
std::lock(mtx1, cout_mtx);
std::lock_guard lock1(cout_mtx, std::adopt_lock);
std::lock_guard lock2(mtx1, std::adopt_lock);
count--;
std::cout << "count: "<<count<<std::endl;
}
}
void calc(int n){
for (int i = 0;i<n;i++){
if (n%2){
f2();
}
else{
f1();
}
}
}
private:
std::mutex mtx1;
std::mutex cout_mtx;
int count=0;
};
int main(){
dosome d;
const int N1 = 10000;
const int N2 = 10001;
std::thread th1(dosome::calc, &d, N1);
std::thread th2(dosome::calc, &d, N2);
th1.join();
th2.join();
}此时lock函数会接管下面的两个lock_guard,当他们被销毁时,析构函数中会自动释放两个互斥量
即使传入的两个互斥量顺序不同,lock的特殊算法也可避免死锁的发生
scope_lock类
该类更加方便,使用多个类进行初始化
构造时可以自动调用lock函数,析构时自动释放
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include<iostream>
#include<mutex>
#include<thread>
#include <chrono>
using namespace std::chrono_literals;
class dosome{
public:
void f1(){
{
std::scoped_lock lock(mtx1, cout_mtx);
count++;
std::cout << "count: "<<count<<std::endl;
}
}
void f2(){
{
std::scoped_lock lock(cout_mtx, mtx1);
count--;
std::cout << "count: "<<count<<std::endl;
}
}
void calc(int n){
for (int i = 0;i<n;i++){
if (n%2){
f2();
}
else{
f1();
}
}
}
private:
std::mutex mtx1;
std::mutex cout_mtx;
int count=0;
};
int main(){
dosome d;
const int N1 = 10000;
const int N2 = 10001;
std::thread th1(dosome::calc, &d, N1);
std::thread th2(dosome::calc, &d, N2);
th1.join();
th2.join();
}读写锁
若存在三个线程读取数据、一个线程写入数据,若使用mutex互斥量进行操作,读写步骤几乎是排队进行的,导致了程序运行效率底下
我们又知道,在没有数据写入的情况下,多个线程同时读取数据是不会发生数据竞争的,不必独占资源
故我们可以规定:
- 一个线程读取数据时,允许其他线程同时读取数据,不允许其他线程写数据
- 一个线程写数据时,不允许其他线程读写数据
这样可以显著提高效率
C++14以上,这样的读写锁的功能由shared_mutex类、shared_timed_mutex提供
该类提供两个模式和两个锁:
- 独占模式:仅有一个线程可以占有互斥量
- 独占锁:独占模式下,当一个线程获得独占锁,任何其他线程都不能获得独占锁和共享锁
- 共享模式:可以有多个线程分享互斥量
- 共享锁:共享模式下,当一个线程获得共享锁,其他线程可获得共享锁,不能获得独占锁
读写锁广泛应用于读多写少的场景下,是一种重要的同步原语
条件变量
条件变量是并发程序中经常用到的同步原语,可以实现在线程间发生通知以实现线程间的同步
工作流
如提供两个线程,用于读取与写入数据,读取要在写入之后
- 接收线程获得互斥量mutex,使用条件变量来等待发送线程的通知,等待时释放mutex
- 发送线程获得mutex,并对共享数据进行写入,完成后释放mutex
- 发送线程使用条件变量发送通知,接收线程接收到通知后重新获得mutex,结束阻塞继续运行
- 接收线程读取处理共享数据,处理完毕后释放mutex
使用
标准库conditional_variable实现条件变量原语,包含以下函数:
等待:
- wait:传入锁的引用以及解除阻塞的条件。进入等待状态时释放锁,结束后获得锁,且都是原子操作
- wait_for:
- wait_until:
通知:
- notify_one:
- notify_all:
例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include<iostream>
#include<thread>
#include<mutex>
#include <chrono>
#include <condition_variable>
using namespace std::chrono_literals;
class channel{
public:
void getData(){
auto tid = std::this_thread::get_id();
std::unique_lock lck(mutex_);
std::cout<<"接收方["<<tid<<"]等待数据"<<std::endl;
condVar.wait(lck);
std::cout<<"接收方["<<tid<<"]得到数据"<<sharedData<<std::endl;
sharedData.clear();
}
void setData(){
static int id = 1;
std::stringstream ss;
ss << "hello #"<<id;
{
std::unique_lock lck(mutex_);
sharedData = ss.str();
std::cout <<"\n发送方:第"<<id<<"条数据写入完毕"<<std::endl;
id++;
}
condVar.notify_one();
std::this_thread::sleep_for(1ms);
}
private:
std::mutex mutex_;
std::condition_variable condVar;
std::string sharedData;
};
int main(){
channel c;
std::thread write_th(channel::setData, &c);
std::thread read_th(channel::getData, &c);
read_th.join();
write_th.join();
}丢失唤醒与虚假唤醒
发现程序运行时,有时会出现死锁的状态
从输出发现,写线程在读线程进入等待前就准备好了数据并发出通知,而读线程进入状态后才
准备接受通知,于是错过了通知,一直等待,发生丢失唤醒
除了丢失唤醒,等待函数还会出现虚假唤醒:即等待线程在条件未满足的情况下被唤醒了
原因与底层硬件软件的异常有关
因此我们最好使用wait函数的重载版本,设置一个等待条件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include<iostream>
#include<thread>
#include<mutex>
#include <chrono>
#include <condition_variable>
using namespace std::chrono_literals;
class channel{
public:
void getData(){
auto tid = std::this_thread::get_id();
std::unique_lock lck(mutex_);
std::cout<<"接收方["<<tid<<"]等待数据"<<std::endl;
condVar.wait(lck, [this]{return !sharedData.empty();});
if (sharedData.empty()){
std::cout<<"接收方["<<tid<<"] 虚假唤醒"<<std::endl;
}
else std::cout<<"接收方["<<tid<<"]得到数据"<<sharedData<<std::endl;
sharedData.clear();
}
void setData(){
static int id = 1;
std::stringstream ss;
ss << "hello #"<<id;
{
std::unique_lock lck(mutex_);
sharedData = ss.str();
std::cout <<"\n发送方:第"<<id<<"条数据写入完毕"<<std::endl;
id++;
}
condVar.notify_one();
std::this_thread::sleep_for(1ms);
}
private:
std::mutex mutex_;
std::condition_variable condVar;
std::string sharedData;
};
int main(){
channel c;
std::thread write_th(channel::setData, &c);
std::thread read_th(channel::getData, &c);
read_th.join();
write_th.join();
}信号量
实现对有限资源的并发访问控制,内部有一个计数器,表示资源的可用数量
线程通过信号量来获得对资源的访问许可,若大于0,可以进行访问,计数器减一
信号量要与互斥量结合使用,用信号量来限制并发访问数量,用互斥量实现对具体数据同步
C++标准库中,有一个类模板counting_semaphore,其中参数LeastMaxValue规定了计数最大值不小于该值,还有他的特化版本binary_semaphore,值设定为1
函数:
- release:将计数值+1,是原子操作
- acquire:若计数值大于1,则将计数值-1,如果为0,则该函数阻塞直到计数值大于1
- try_acquire:成功时将计数值-1,返回true
- try_acquire_for:阻塞时最多等待的时间
- try_acquire_until:阻塞时最多等待到某个时刻
例:循环读写buffer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#include <iostream>
#include <thread>
#include <chrono>
#include <random>
#include <array>
#include <semaphore>
#include <syncstream>
// 简单的随机辅助类
class Random {
public:
// 构造 [l, r] 的整数随机
Random(int l, int r)
: eng(std::random_device{}()),
dist(l, r)
{}
int operator()() {
return dist(eng);
}
private:
std::mt19937 eng;
std::uniform_int_distribution<int> dist;
};
// RWBuffer 模板
template <size_t BuffSize = 6>
class RWBuffer {
public:
RWBuffer() {
buff.fill(' '); // 填充空格
}
// 模拟生产者准备数据
char prepareData() {
std::this_thread::sleep_for(std::chrono::milliseconds(rgen()));
return chargen();
}
// 模拟消费者处理数据
void processData(char /*ch*/) {
std::this_thread::sleep_for(std::chrono::milliseconds(rgen2()));
}
// 原子打印缓存区
void printBuffer() {
std::osyncstream sync(std::cout);
sync << "BUFFER: |";
for (auto c : buff) {
sync << c << "|";
}
sync << "\n";
}
// 写线程函数
void writeToBuffer(size_t iterations) {
std::osyncstream(std::cout) << "写线程就绪.\n";
// 总共写 iterations * BuffSize 次
for (size_t i = 0; i < iterations * BuffSize; i++) {
char ch = prepareData();
sem_writer.acquire(); // 等待有空位
size_t idx = i % BuffSize;
buff[idx] = ch;
std::osyncstream(std::cout) << "写线程:写入 '" << ch << "' 到位置 " << idx << "\t";
printBuffer();
sem_reader.release(); // 通知读线程
}
}
// 读线程函数
void readFromBuffer(size_t iterations) {
std::osyncstream(std::cout) << "读线程就绪.\n";
for (size_t i = 0; i < iterations * BuffSize; i++) {
sem_reader.acquire(); // 等待有数据
size_t idx = i % BuffSize;
char ch = buff[idx];
processData(ch);
buff[idx] = ' ';
std::osyncstream(std::cout) << "读线程:读取 '" << ch << "' 从位置 " << idx << "\t";
printBuffer();
sem_writer.release(); // 通知写线程
}
}
private:
Random rgen{500,1000};
Random rgen2{800,1500};
Random chargen{'A','Z'};
std::array<char, BuffSize> buff;
std::counting_semaphore<BuffSize> sem_reader{0};
std::counting_semaphore<BuffSize> sem_writer{BuffSize};
};
int main(){
constexpr size_t N = 6;
constexpr size_t ITERS = 3;
RWBuffer<N> buffer;
// 启动写线程和读线程
std::thread writer(&RWBuffer<N>::writeToBuffer, &buffer, ITERS);
std::thread reader(&RWBuffer<N>::readFromBuffer, &buffer, ITERS);
writer.join();
reader.join();
return 0;
}异步任务
将函数调用交给另一个线程异步执行,原有线程继续执行代码,在需要时再去获得执行结果
C++中提供了async与packaged_task进行异步任务
async
语法:
std::future<返回类型> async(std::launch policy, F& f, Args&&... args);
std::future<返回类型> async(F& f, Args&&... args);(默认策略)
执行策略(用整型不同bit位表示):
std::launch::async:异步调用,00000001,相当于新建或使用线程池中的线程并在线程中调用函数std::launch::deferred:推迟调用,00000010,async返回的future对象用于存储函数指针、对象、实参拷贝,调用get函数时会同步调用存储的这个函数std::launch::async|std::launch::deferred:组合,00000011,同步还是异步执行根据具体实现决定
packaged_task
方法
- get_future():获得一个future对象,封装函数的返回结果存储在future对象的共享状态中
void operator()(ArgTypes... args):用于调用封装的函数,将返回结果存储在future中,此时会同步执行封装函数- reset():用于重复使用packaged_task,清空状态获得一个新的future对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include<iostream>
#include<thread>
#include<random>
#include<vector>
#include<future>
#include<syncstream>
// 简单的随机辅助类
class Random {
public:
// 构造 [l, r] 的整数随机
Random(int l, int r)
: eng(std::random_device{}()),
dist(l, r)
{}
int operator()() {
return dist(eng);
}
private:
std::mt19937 eng;
std::uniform_int_distribution<int> dist;
};
float Compute(std::vector<float> &v){
std::this_thread::sleep_for(std::chrono::seconds(2));
auto r = std::accumulate(v.begin(), v.end(), 0.0f);
std::osyncstream(std::cout)<<"执行任务的线程id="<<std::this_thread::get_id()<<std::endl;
return r;
}
int main(){
const int RN_MAX = 10000;
Random rgen(0, RN_MAX);
std::vector<float> numbers(100, 0.0f);
std::generate(numbers.begin(), numbers.end(), [&]{return float(rgen()/RN_MAX);});
std::cout <<"创建任务的线程id:"<<std::this_thread::get_id()<<std::endl;
std::packaged_task<float(std::vector<float>&)> task(Compute);
std::cout<<"直接调用"<<std::endl;
std::future<float> result = task.get_future();
task(numbers);
std::cout <<"result="<<result.get()<<std::endl;
task.reset();
std::cout<<"线程调用"<<std::endl;
result = task.get_future();
std::thread t(std::move(task), std::ref(numbers));
std::cout<<"result="<<result.get()<<std::endl;
t.join();
return 0;
}线程屏障
线程屏障提供了一个计数器,每个参与任务的线程完成自己的任务后计数器-1,而需要同步的线程可以阻塞到计数器为0时再执行后面的代码
latch
latch是一次性的计数,参数为作为计数器的初始值
- count_down(n=1):将计数器减去传入的数值,默认为1,是原子操作
- wait():将线程阻塞到计数器为0后释放
- try_wait():检查计数器是否为0,返回true为0,返回false不一定为0
- arrive_and_wait(n=1):执行了count_down和wait
e.g.模拟paobu
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include<iostream>
#include<thread>
#include<latch>
#include<random>
#include<chrono>
struct Runner{
std::string name;
int time = 0;
void run(std::latch& start, std::latch& end){
start.wait(); // 等待发令枪响
// 模拟跑步过程
auto start_time = std::chrono::system_clock::now();
std::mt19937 rng(std::random_device{}());
std::uniform_int_distribution<unsigned int> uniform_dist(0, 2000);
std::this_thread::sleep_for(std::chrono::milliseconds(9600+uniform_dist(rng)));
auto end_time = std::chrono::system_clock::now();
time = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();
end.count_down(1); // 跑完,计数器-1
}
bool operator<(const Runner& other) const {
return time < other.time;
}
};
int main(){
std::vector<Runner> runners = {
{"Alice"}, {"Bob"}, {"Charlie"}, {"Diana"}, {"Eve"}
};
// 等待阶段
const int runner_count = runners.size();
std::latch start(1); // 代表发令枪,等到为0时执行后面代码
std::latch finish(runner_count); // 代表完赛选手数量,有一个选手冲线则-1,直到为0时比赛结束
std::vector<std::thread> threads;
for (int i = 0; i<runner_count; ++i) {
threads.emplace_back(&Runner::run, &runners[i], std::ref(start), std::ref(finish));
}
std::cout <<"发令枪响"<< std::endl;
start.count_down(); // 发令枪响,所有选手开始跑
finish.wait(); // 等待所有选手跑完,阻塞
// 结束后统计成绩
std::cout <<"比赛结束"<< std::endl;
std::sort(runners.begin(), runners.end());
for (const auto& runner : runners) {
std::cout << runner.name << " : " << float(runner.time)/1000 << "秒" << std::endl;
}
for(auto& thread : threads) {
thread.join();
}
}1
2
3
4
5
6
7
发令枪响
比赛结束
Eve : 9.946秒
Bob : 10.034秒
Diana : 10.092秒
Alice : 10.842秒
Charlie : 11.621秒barrier
barrier每次计数到0时,都会把初始值恢复,进行下一轮计数
barrier是个类模板,参数为计数器初始值
arrive(n=1):将计数器减去指定参数n
wait():等待计数器为0
arrive_and_wait(n=1):前两个结合
arrive_and_drop(n=1):除了将本次-n,还将下一次的初始值-n
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#include<iostream>
#include<thread>
#include<barrier>
#include<random>
#include<chrono>
#include<vector>
class participant {
public:
std::string name;
participant(const std::string& name) : name(name) {}
template <typename BarrierType>
void run(BarrierType& finish){
// 随机数确定是否能存货
std::mt19937 rng(std::random_device{}());
std::uniform_int_distribution<unsigned int> uniform_dist(1000, 2000);
for (int i = 0; i<5;i++){
auto rnd = uniform_dist(rng);
std::this_thread::sleep_for(std::chrono::milliseconds(rnd));
alive = rnd%3;
if (alive){
finish.arrive_and_wait(); // 到达终点,等待其他选手
}
else{
finish.arrive_and_drop(); // 选手掉队,直接结束
break;
}
}
}
bool is_alive() const {
return alive;
}
private:
bool alive = true; // 选手是否存活
};
int main(){
std::vector<participant> participants = {
{"Alice"}, {"Bob"}, {"Charlie"}, {"Diana"}, {"Eve"}
};
const int participant_count = participants.size();
int round = 0;
// barrier每轮结束时的回调函数
auto on_complete = [&participants, &round](){
round++;
std::cout <<"第"<< round << "轮幸存者:"<< std::endl;
int count = 0;
for (auto & p : participants) {
if (p.is_alive()) {
std::cout << p.name << " ";
count++;
}
}
if (count==0){
std::cout << "没有幸存者" << std::endl;
}
std::cout << std::endl;
};
std::barrier finish(participant_count, on_complete); // 参赛者数量为初始值,oncomplete为每轮的回调
std::vector<std::thread> threads;
for (auto &p: participants) {
threads.emplace_back([&p, &finish](){p.run(finish);});
}
for (auto &thread : threads) {
thread.join(); // 等待所有线程结束
}
return 0;
}注:对计数器进行-n操作时,传入的n的值不能大于当前计数值,否则会导致未定义错误
原子操作
原子操作是不能被分割或中断的操作,其他线程只能看到开始与结束的状态
类型
- Load 加载
- Store 存储
- Read Modify Write 读取-修改-写入(RMW)
atomic_flag
它只有true和false两个值,true表示标志被设置,false表示标志被清除
初始化可以使用ATOMIC_FLAG_INIT宏,也可以使用默认构造函数(C++20后)
clear():清除标志,是一个原子写操作,函数有一个参数
memory_order,指定内存顺序,默认为memory order sequentially consistenttest_and_set():将标志设置为true,并返回原有值,是一个RMW操作
(以下为C++20新增)
- test():读取数据
- wait(oldvalue):等待条件达成
- notify_one():通知其中一个线程结束等待
- notify_all():通知所有线程结束等待
自旋锁
自旋:持续监控变量以等待发生变化的过程
自旋锁消耗资源较高,常用于临界区代码耗时很少的场合
1
2
3
4
5
6
7
8
9
10
11
class SpinLock{
public:
void lock(){
while (flag.test_and_set(std::memory_order_acquire)){}
}
void unlock(){
flag.clear(std::memory_order_release);
}
private:
std::atomic_flag flag{ATOMIC_FLAG_INIT};
};例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include<iostream>
#include<atomic>
#include<thread>
#include<assert.h>
class SpinLock{
public:
void lock(){
while (flag.test_and_set(std::memory_order_acquire)){}
}
void unlock(){
flag.clear(std::memory_order_release);
}
private:
std::atomic_flag flag{ATOMIC_FLAG_INIT};
};
int main(){
SpinLock spin;
int count = 0;
auto inc = [&](){
for (int i = 0; i<1000;i++){
std::lock_guard<SpinLock> lock(spin);
++count;
}
};
std::thread t1(inc);
std::thread t2(inc);
t1.join();
t2.join();
std::cout << "count = " << count << std::endl;
}这里由于自旋锁包含lock与unlock函数,符合C++中的基本锁类型,故可以使用lock_guard自动加锁解锁
例
实现了一个一次生产者与消费者的模拟
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include<iostream>
#include<atomic>
#include<thread>
#include<assert.h>
struct Channel{
void getData(){
std::cout <<"[Consumer] wait for data ready\n";
dataReady.wait(false); // 等待数据准备好
std::cout<<"[Consumer] data is:"<<data<<std::endl;
dataReady.clear();
}
void setData(const std::string& v){
std::this_thread::sleep_for(std::chrono::milliseconds(2000)); // 模拟数据准备时间
std::cout<< "'[Producer] write data\n";
data = v;
dataReady.test_and_set();
dataReady.notify_one();
}
private:
std::atomic_flag dataReady = ATOMIC_FLAG_INIT; // 用于标记数据是否准备好
std::string data; // 存储数据
};
int main(){
Channel channel;
std::thread consumer(Channel::getData, &channel);
std::thread producer(Channel::setData, &channel, "Hello, World!");
consumer.join();
producer.join();
}特化版本
atomic除了主模板还有针对指针、智能指针与数据类型的特化版本

其中主模板中的T要求是可平凡拷贝的
有无锁的判断
我们可以通过atomic模板提供的is_lock_free()函数判断有无锁,若无锁(通过CPU的原子指令)则函数返回true,否则返回false
- 基本类型和指针都是无锁的
- 自定义类型与类型大小与对齐有关:类型大小是\(2^n\)且小于最大对齐数(16字节)
主要的原子操作
加载操作:用于原子地获得当前变量的值
1
2
T load(std::memory_order order = std::memory_order_seq_cst) const noexcept;
operator T() const noexcept;存储操作:用于将变量原子地修改为要更新的值
1
2
void store(T desired, std::memory_order order = std::memory_order_seq_cst) noexcept;
T operator=(T desired) noexcept;CAS(比较与交换函数):
通过compare_exchange函数完成
1
2
bool compare_exchange_weak(T& expected, T desired, std::memory_order success, std::memory_order failure) noexcept;
bool compare_exchange_strong(T& expected, T desired, std::memory_order success, std::memory_order failure) noexcept;符合条件的CAS会直接使用CPU硬件指令来实现
其中weak版可能有虚假失败的情况
例:CAS循环模式
CAS是实现无锁算法的重要方式,经常使用CAS循环实现对原子值更改
1
2
3
4
5
6
7
8
9
int fetch_multiply(std::atomic<int> &value, int multiplier){
int oldValue = value.load();
int desire;
do{
desired = oldValue*multiplier;
}while(!value.compare_exchange_strong(oldValuem desired));
return oldValue;
}原理:加载当前值到oldValue,使用其来计算乘积并存储到本地变量desire,调用compare_exchange以原子方式更新value
若原value等于oldValue,说明没有并发线程修改过value,则更新为desired
若不等与则说明value被其他线程修改,那么将oldValue的值更新为当前value重试
例:使用atomic和CAS实现无锁堆栈
在多线程模式下,将新节点的next指向原节点与将head节点指向新节点是两个独立操作,其中第二步的前提是链表头部节点没有变化
此时另一个线程的操作可能会导致链表头部发生变化

如此时节点A向堆栈push了一个新节点A

此时另一个线程又向堆栈push了一个新节点B,head指向了B
若第一个线程还将head更新为节点A,则会将节点B孤立

此时我们在compare_exchange函数中将A与B节点的next节点进行比较,若相等,说明头部没有变化,那么可以将A作为新的节点,将head指向A
否则说明已经发生变化,就将A的next值更新为head当前值,并更新head
pop操作更简单,将head当前值存储在局部变量中,再把其next与当前值进行比较,若相等则说明头部未改变,将head更新为头节点next值,否则说明发生变化,将局部变量更新为head当前值,继续循环
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#include<iostream>
#include<atomic>
#include<thread>
#include<memory>
#include<syncstream>
template<typename T>
class LockFreeStack{
private:
struct Node{
T data;
Node* next = nullptr;
Node(T d):data(d){}
};
std::atomic<Node*> head;
public:
LockFreeStack() = default;
LockFreeStack(const LockFreeStack&) = delete;
LockFreeStack& operator=(const LockFreeStack&) = delete;
void push(T val){
auto *newNode = new Node(val);
//std::shared_ptr<Node> newNode = std::make_shared<Node>(val);
newNode->next = head.load();
while(!head.compare_exchange_strong(newNode->next, newNode));
std::osyncstream(std::cout) <<" Pushed: " << val << "\n";
}
bool pop(T &result){
Node* oldHead = head.load();
do{
if (oldHead==nullptr){
std::osyncstream(std::cout)<< " Stack is empty, cannot pop.\n";
return false;
}
}while(oldHead&&!head.compare_exchange_strong(oldHead, oldHead->next));
if (oldHead){
result = oldHead->data;
std::osyncstream(std::cout)<< " Popped: " << result << "\n";
return true;
}
else return false;
}
};
void pushItems(LockFreeStack<int> &stack, int start, int end){
for (int i = start; i<end;i++){
stack.push(i);
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
}
void popItems(LockFreeStack<int> &stack, int count){
int value;
for (int i = 0; i<count;i++){
stack.pop(value);
std::this_thread::sleep_for(std::chrono::milliseconds(30));
}
}
int main(){
LockFreeStack<int> stack;
std::thread t1(pushItems, std::ref(stack), 1, 10);
std::thread t2(popItems, std::ref(stack),7);
std::thread t3(pushItems, std::ref(stack), 10, 20);
std::thread t4(popItems, std::ref(stack),7);
t1.join();
t2.join();
t3.join();
t4.join();
return 0;
}(这里shared_ptr不是平凡可复制类,故改为了裸指针,同时无法解决ABA问题)
内存模型
顺序一致性模型
程序按照代码编写顺序执行,并且所有线程都看到相同顺序的模型
枚举值名称为memory_order_seq_cst
该模型顺序简单,不易出错,但可能无法充分使用系统和硬件的性能优化
Relaxed模型
relaxed模型下,不对内存顺序作约束,可能出现顺序一致性模型中不会出现的结果
CPU和内存之间会有多级缓存
缓存、编译器的优化都可能改变同一线程不同语句的执行顺序
枚举值名称为memory_order_relaxed
relaxed模型中,线程执行代码对值的更改首先保存在缓存中,当代码执行完毕,才会更新内存中的值

如该例子中,线程1与2执行了①和③代码,将缓存中的a与b修改为1,再执行②和④代码,读取内存中的a与b都为0
在代码执行完毕后,缓存中的值才会修改到内存中
Acquire/Release模型
枚举值有如下三种:
memory_order_acquirememory_order_releasememory_order_acq_rel
通常在原子操作中,
写入数据使用release,即将值写入后,将缓存中的数据发布到内存中,包括之前修改过的其他变量
获取数据使用acquire,即将值从内存中更新到缓存,并读取这个值
对于先加载在修改的,使用acq_rel
在mutex锁中,mutex.lock()与acquire语义是等价的,mutex.unlock()与release语义是等价的,两者之间是临界区
临界区内的代码可以互相调换顺序,但不能移出临界区外
临界区上方的代码可以进入临界区,但不能移出临界区下方
临界区下方的代码可以进入临界区,但不能移出临界区上方
于此对应的memory_order_acquire与memory_order_release遵循同样的原则