红魔咖啡馆

头发越掉越多,头发越掉越少

0%

【C++】并发编程

并发编程

多个任务的执行方式

  • 顺序:一个接着一个
  • 并行:同一时间执行多个任务(并行是并发的子集)
  • 并发:多个任务在同一个重叠时间段执行

OpenMP

使用#prgma预处理指令可以将代码转变为并行处理

此时需要在编译时添加参数-fopenmp

e.g. 计算\(\pi\)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include<iostream>
using namespace std;
double cal(const long num_steps){
  double step = 1.0/num_steps;
  double sum = 0.0;
#pragma omp parallel for reduction(+:sum)
  for (long i = 0; i<num_steps;i++){
    double x = (i+0.5)*step;
    sum+=4.0/(1.0+x*x);
  }
  return sum*step;
}
int main(){
  cout << cal(10);
}

其中第六行语句的意思:

  • omp parallel for表示将for循环每次循环转化为并行计算

  • reduction表示用于reduce计算的变量与运算符

  • 局部变量x在每个线程中修改,不影响其他线程

  • sum是共享对象,不能在并行线程中直接修改,否则导致数据竞争

    sum需要reduce操作,其中reduction表明了需要reduce操作的变量与运算符

STL并行算法库

使用<algorithm><numeric>库中的函数,可以简单实现并行

在支持并行的算法函数中,首个参数可以选择执行策略:

  • std::execution::sequenced_policy:顺序执行策略(实例:seq)
  • std::execution::parallel_policy:并行执行策略(实例:par)
  • std::execution::parallel_unsequenced_policy:多线程加向量化策略(实例:par_seq)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include <algorithm>
#include <iostream>
#include <execution>
#include <vector>
int main(){
  auto op = [](int v){std::cout << v << ";\n";};
  std::vector<int> numbers{1,2,3,4,5,6,7,8,9};
  std::cout <<"sequential for_each"<<std::endl;
  for_each(std::execution::seq, numbers.begin(), numbers.end(), op);
  std::cout <<"parallel for_each"<<std::endl;
  for_each(std::execution::par, numbers.begin(), numbers.end(), op);
  std::cout <<"parallel unseq for_each"<<std::endl;
  for_each(std::execution::par_unseq, numbers.begin(), numbers.end(), op);
}

以上没有对cout进行同步措施,会导致输出错误

线程

线程与进程

进程:程序执行过程,相互独立,操作系统为每一个进程分配一个执行控制块来控制进程

线程:进程中的执行过程,系统为每一个线程分配一个栈于线程控制块,线程间共享堆与全局数据

创建线程

使用<thread>类创建线程

thread(F&& f, Args&&... args);

  • 第一个参数为线程的入口函数(线程函数),可以是各种可调用的对象
  • 后面的Args为传递给可调用对象的实参
  • 若第一个参数为非静态成员函数,则后面的第一个参数为调用这个成员函数的类对象地址

thread的返回值会被忽略,故要使用按引用传递的参数,或存储在类对象的数据成员中

用该函数初始化后会创建一个新线程,执行线程函数

线程只能移动构造或移动赋值,防止多个线程对象代表一个执行线程

.join()

该函数会使得当前线程执行结束后等待其他线程回会合,避免出现提前结束导致出现错误

这种等待状态称为阻塞,若对应线程提前结束,则join函数也会直接返回

调用函数时,该线程必须是joinable的,意味着该线程必须是正在执行或执行完毕但未合并的线程,可以使用.joinable()判断是否可合并

不能被合并的情况:

  • 默认构造函数初始化的
  • join()完成的
  • detach()后的
  • 被移动的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include<thread>
#include<iostream>
#include<array>

void greeting(int idx){
  printf("Hello from thread %d\n", idx);
}

int main(){
  std::array<std::thread, 10> workers;
  for (int i = 0;i<10;i++){
    workers[i] = std::thread(greeting, i);
  }
  for (auto &workers:workers){
    workers.join();
  }
}
1
2
3
4
5
6
7
8
9
10
Hello from thread 0
Hello from thread 1
Hello from thread 2
Hello from thread 3
Hello from thread 4
Hello from thread 5
Hello from thread 6
Hello from thread 7
Hello from thread 8
Hello from thread 9

Fork and join

可以使用线程实现并行

通过Fork and join模式:把要执行的数据分成n份,由n个线程并行处理自己的那部分数据,再进行汇总,n可以根据cpu核心设置

实现简单,但对于复杂计算,相同数据处理时间不同,导致有些线程很早处理完毕,有的需要更长时间结束

实现transform_reduce函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include
#include
#include
#include
template
void transform_reduce(Iter first, Iter last, UnaryOp transform, BinaryOp reduce, Result& res){
  for (auto it=first;it numbers;
  numbers.reserve(N);
  for (int i = 0; i < N; ++i) {
    numbers.push_back((double)i/N);
  }
  auto transform = [](double x) { return x * x; };
  auto reduce = [](double x, double y) { return x + y; };
  const int N_thraeds = 8;
  std::vector workers;
  std::array subResults={};
  for (int i = 0; i::iterator,
        decltype(transform),
        decltype(reduce),double>,
        low, high, transform, reduce, std::ref(subResults[i])
      )
    );
  }
  double result = 0;
  for (int i = 0; i < N_thraeds; ++i) {
    workers[i].join();
    result = reduce(result,subResults[i]);
  }
  std::cout << "Result: " << result << std::endl;
  return 0;
}

Divide and conquer

使用分治将一个任务分成两个小任务,由两个线程完成,然后小任务再分成两个小任务,创建两个线程完成,直到不能分解

另外需要设计一个阈值,当线程过多时就不再创建,而是使用同步计算

为了让各个线程任务饱满,可以采用一半采用同步调用,一半分给另一个线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include<thread>
#include<vector>
#include<array>
#include<iostream>
template<typename Iter, typename UnaryOp, typename BinaryOp, typename Result, std::size_t MinDist = 10>
void transform_reduce(Iter first, Iter last, UnaryOp transform, BinaryOp reduce, Result& res){
  std::size_t distance = std::distance(first, last);
  if (distance==1){
    res = reduce(res, transform(*first));
  }
  else{
    std::size_t half = distance / 2;
    Iter middle = first+half;

    Result r_result = 0;
    Result l_result = 0;

    std::thread l_thread(
      transform_reduce<Iter, UnaryOp, BinaryOp, Result>,
      first, middle, transform, reduce, std::ref(l_result)
    );
    std::thread r_thread(
      transform_reduce<Iter, UnaryOp, BinaryOp, Result>,
      middle, last, transform, reduce, std::ref(r_result)
    );

    l_thread.join();
    r_thread.join();
    res = reduce(res, reduce(l_result, r_result));

  }
}

promise & future

这两个类提供了线程之间简单的通信机制

通过promise来设置值或异常,通过future来获得值或异常

一对promise&future共享一个shared state(共享状态),是一个类对象,提供了在保证线程安全的情况下,设置和获取状态与结果的方法,使用shared_ptr指向该状态

常用于发送-接受模式中

promise

promise类模板提供了按值、按引用传递数据,以及仅传递信号三种版本

对应的成员函数:

  • promise():构造函数,创建一个共享状态
  • promise(promise&& x):移动构造函数,将被移动对象x的共享状态转移过去
  • get_future():创建一个future对象,使用promise中的共享状态,只能调用一次
  • set_value():设置共享状态中的值,并设置为就绪状态
  • set_exception():设置共享状态的异常

注:为了保证一对一性,promise类删除了拷贝构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include<future>
#include<iostream>
void compute_pi(const long num_steps, std::promise<double>&& promise){
  double step = 1.0/ num_steps;
  double sum = 0.0;
  for(long i = 0; i < num_steps; ++i){
    double x = (i + 0.5) * step;
    sum += 4.0 / (1.0 + x * x);
  }
  promise.set_value(sum * step);
}

void display(std::future<double>&& receiver)
{
  double pi = receiver.get();
  std::cout << "Computed value of Pi: " << pi << std::endl;
}

int main(){
  const int N_steps = 100000000;
  std::promise<double> promise;
  auto receiver = promise.get_future(); // 调用获得future对象后就没什么用了
  std::thread th1(compute_pi, N_steps, std::move(promise)); // 将promose对象转换为xvalue,作为实参换入
  std::thread th2(display, std::move(receiver));
  th1.join();
  th2.join();
}

future

future中的方法主要用于获取状态与结果

  • get():获得结果,若对应结果没有处在就绪状态,那么这个调用会阻塞直到就绪
  • wait():等待状态就绪,之后返回,此时调用get可以直接获得结果
  • wait_for():等待指定时长,返回状态码
  • wait_until():等待到指定时刻,返回状态码
  • share():

状态码包括:

  • ready:共享状态就绪
  • timeout:超时
  • deferred:共享状态中有延缓执行的函数(显式请求中才会调用的函数)

若需要让多个线程获得结果,需要shared_future,构建方法如下:

  • 定义对象,使用promise的get_future()作为初始化参数
  • 在原有future上调用方法.share(),此时原有不再使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include<future>
#include<iostream>
#include<sstream>

void compute_pi(const long num_steps, std::promise<double>&& promise){
  double step = 1.0/ num_steps;
  double sum = 0.0;
  for(long i = 0; i < num_steps; ++i){
    double x = (i + 0.5) * step;
    sum += 4.0 / (1.0 + x * x);
  }
  promise.set_value(sum * step);
}
void display(std::shared_future<double> reveiver){
  static std::mutex mu;
  double pi = reveiver.get();
  printf("Computed value of Pi: %f\n", pi);
}

int main(){
  const int N_steps = 100000000;
  std::promise<double> promise;
  std::shared_future<double> shared_receiver(promise.get_future());
  std::thread th1(compute_pi, N_steps, std::move(promise));
  // 使用shared_future可以让多个线程共享同一个future对象
  std::thread th2(display, shared_receiver);
  std::thread th3(display, shared_receiver);
  th1.join();
  th2.join();
  th3.join();
}

互斥和锁

多线程中,当多个线程队同一个对象进行访问,并且至少有一个在写数据时,会产生数据竞争

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include<array>
#include<thread>
#include<iostream>

void inc(int &res){
  for(int i = 0; i < 1000000; ++i){
    res++;
  }
}

int main(){
  int counter = 0;
  std::thread th1(inc, std::ref(counter));
  std::thread th2(inc, std::ref(counter));

  th1.join();
  th2.join();
  std::cout << "Counter: " << counter << std::endl;
}

该程序创建了两个线程进行累加,但最后得到的结果都不是正确结果,且每次都不一样

正常

这是一个正常的执行过程

错误执行

若顺序是这样结果就不对了

互斥量

C++中所有运算默认为非原子操作,即该线程操作随时会被终端或被其他线程访问

mutex提供了一种同步机制,实现多个线程的安全访问

C++11开始,提供了std::mutex来实现互斥原语,是最常用的同步原语之一

有三个主要函数:

  • lock():锁定互斥量,即获得互斥量,若已经被其他线程锁定,则会进入阻塞直到被解锁
  • try_lock():尝试锁定互斥量,若成功锁定则返回true,否则返回false(不保证被其他线程锁定)
  • unlock():解锁,释放互斥量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include<array>
#include<thread>
#include<iostream>
#include<mutex>

class counter{
public:
  void inc(){
    // 对要修改的资源进行保护
    // 保证同一时间只有一个线程可以修改资源
    // 即要么获得互斥量的锁,要么等待
    counter_mutex.lock();
    m_count++; // 临界区
    counter_mutex.unlock();
  }
  int get(){
    int temp;
    counter_mutex.lock();
    temp = m_count; // 临界区
    counter_mutex.unlock();
    return temp;
  }
private:
  int m_count = 0;
  std::mutex counter_mutex;
};
int main(){
  counter c;
  auto increase = [](counter &c){
    for(int i = 0; i < 1000000; ++i){
      c.inc();
    }
  };
  std::thread th1(increase, std::ref(c));
  std::thread th2(increase, std::ref(c));

  th1.join();
  th2.join();
  std::cout << "Counter: " << c.get() << std::endl;
}

其中,被mutex保护的操作区域叫做临界区

为了避免未释放互斥量的问题,我们通常结合以下对象使用

  • lock_guard():可以通过调用上锁后自动解锁,防止忘记解锁以及出现异常离开作用域时没有解锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class counter{
public:
  void inc(){
    std::lock_guard lock(counter_mutex);
    m_count++; // 临界区
  }
  int get(){
    std::lock_guard lock(counter_mutex);
    return m_count;
  }
private:
  int m_count = 0;
  std::mutex counter_mutex;
};
  • unique_lock():更加灵活,可以随时指定锁定与解锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include<array>
#include<thread>
#include<iostream>
#include<mutex>

class counter{
public:
  void inc(int n){
    std::unique_lock lock(counter_mutex, std::defer_lock);
    while(n--){
      lock.lock(); // 对要修改的资源进行保护
      m_count++; // 临界区
      lock.unlock();
    }
  }
  int get(){
    std::unique_lock lock(counter_mutex);
    return m_count; 
  }
private:
  int m_count = 0;
  std::mutex counter_mutex;
};
int main(){
  counter c;
  auto increase = [](counter &c){
    c.inc(1000000);
  };
  std::thread th1(increase, std::ref(c));
  std::thread th2(increase, std::ref(c));

  th1.join();
  th2.join();
  std::cout << "Counter: " << c.get() << std::endl;
}
  • timed_mutex:有等待时长的互斥
    • try_lock_for():指定时长内获得mutex的所有权
    • try_lock_until():指定时刻前获得mutex的所有权

​ 返回true表明获得mutex,否则返回false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include<mutex>
#include<thread>
#include<iostream>
#include<chrono>
#include<sstream>

using namespace std::chrono_literals;

class TryDemo{
public:
  void print(){
    for (int i = 0; i<10; i++){
      std::unique_lock lock(m_mutex, std::defer_lock);
      if (lock.try_lock_for(100ms)) {
        {
        std::lock_guard guard(cout_mutex);
        std::cout <<"["<<std::this_thread::get_id()<<"] "
                  << "Got the lock, printing " << i << std::endl;
        }
        std::this_thread::sleep_for(100ms);
      } // 尝试获取锁,
      else{
        std::lock_guard guard(cout_mutex);
        std::cout <<"["<<std::this_thread::get_id()<<"] "
                  << "Failed to get the lock, skipping " << i << std::endl;
        std::this_thread::sleep_for(100ms);
       }
    }
  }
private:
  std::timed_mutex m_mutex;
  std::mutex cout_mutex; 
  int m_count = 0;
};

int main(){
  TryDemo demo;
  auto print = [](TryDemo &demo){
    demo.print();
  };
  std::thread th1(print, std::ref(demo));
  std::thread th2(print, std::ref(demo));
  th1.join();
  th2.join();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[2] Got the lock, printing 0
[3] Failed to get the lock, skipping 0
[2] Got the lock, printing 1
[3] Failed to get the lock, skipping 1
[2] Got the lock, printing 2
[3] Failed to get the lock, skipping 2
[2] Got the lock, printing 3
[3] Failed to get the lock, skipping 3
[2] Got the lock, printing 4
[3] Failed to get the lock, skipping 4
[2] Got the lock, printing 5
[2] Got the lock, printing 6
[3] Failed to get the lock, skipping 5        
[3] Got the lock, printing 6
[3] Got the lock, printing 7
[2] Failed to get the lock, skipping 7        
[2] Got the lock, printing 8
[3] Failed to get the lock, skipping 8
[2] Got the lock, printing 9
[3] Failed to get the lock, skipping 9

死锁

单一线程中多次锁定同一个互斥量会导致死锁

常见死锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include
#include
#include

class dosome{
public:
  int f1(){
    std::lock_guard lock(mtx);
    count*=2;
    f2();
    return count;

  }
  void f2(){
    std::lock_guard(mtx); // 已经在外层被锁定过了
    count++;
  }
private:
  std::mutex mtx;
  int count=0;
};

此时所在线程会进入阻塞状态

我们使用`recursive的mutx优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include<iostream>
#include<mutex>
#include<thread>

class dosome{
public:
  int f1(){
    std::lock_guard lock(mtx);
    count*=2;
    f2();
    return count;

  }
  void f2(){
    std::lock_guard(mtx); // 已经在外层被锁定过了
    count++;
  }
private:
  std::recursive_mutex mtx;
  int count=0;
};

当被锁定并获得互斥量后,当前线程再调用lock后仍会成功而不会阻塞,直到调用相同数量的unlock()才会被释放

多线程中

多线程中,导致死锁原因较多

  1. 锁定一个互斥量后没有释放,导致后面的线程获取该互斥量时进入阻塞状态

解决方法:使用RAII,即使用lock_guard等自动对象,在离开作用域后自动释放互斥量

  1. 同时对多个互斥量锁定解锁时,获得和释放顺序不同导致死锁

如以下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include<iostream>
#include<mutex>
#include<thread>
#include <chrono>
using namespace std::chrono_literals;

class dosome{
public:
  void f1(){
    {
      std::lock_guard lock1(mtx1);
      std::lock_guard lock2(cout_mtx);
      count++;
      std::cout << "count: "<<count<<std::endl;
    }
    std::this_thread::sleep_for(1ms);
   

  }
  void f2(){
    {
      std::lock_guard lock1(cout_mtx);
      std::lock_guard lock2(mtx1); 
      count--;
      std::cout << "count: "<<count<<std::endl;
    }
    std::this_thread::sleep_for(1ms);
  }
  void calc(int n){
    for (int i = 0;i<n;i++){
      if (n%2){
        f2();
      }
      else{
        f1();
      }
    }
  }
  
private:
  std::mutex mtx1;
  std::mutex cout_mtx;
  int count=0;
};
int main(){
  dosome d;
  const int N1 = 10000;
  const int N2 = 10001;
  std::thread th1(dosome::calc, &d, N1);
  std::thread th2(dosome::calc, &d, N2);
  th1.join();
  th2.join();
}

这里f1与f2都对两个互斥量进行锁定,顺序不同

主函数设置两个线程分别为10000次,执行f1,10001次,执行f2

此时运行程序会在一定时间发生死锁:

线程1在某个时刻获得了mtx1,线程2在下一时刻获得了cout_mtx

接下来线程1需要获得cout_mtx,但已被2占有,需要进入阻塞等待资源释放

同时线程2需要获得mtx1,但已被1占有,需要进入阻塞等待资源释放

故整个程序进入阻塞,无法执行

阻塞

活锁

对于上述问题,解决方法之一是使用timed_mutex+try_lock_for

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#include<iostream>
#include<mutex>
#include<thread>
#include <chrono>
using namespace std::chrono_literals;

class dosome{
public:
  void f1(){
    while(1){
      std::unique_lock lock1(mtx1, std::defer_lock);
      std::unique_lock lock2(cout_mtx, std::defer_lock);
      if (!lock1.try_lock_for(100ms)) continue;
      if (!lock2.try_lock_for(100ms)) continue;
      count++;
      std::cout << "count: "<<count<<std::endl;
      break;
    }
  }
  void f2(){
    while(1){
      std::unique_lock lock1(cout_mtx, std::defer_lock);
      std::unique_lock lock2(mtx1, std::defer_lock);
      if (!lock1.try_lock_for(100ms)) continue;
      if (!lock2.try_lock_for(100ms)) continue;
      count--;
      std::cout << "count: "<<count<<std::endl;
      break;
    }
  
  }
  void calc(int n){
    for (int i = 0;i<n;i++){
      if (n%2){
        f2();
      }
      else{
        f1();
      }
    }
  }
  
private:
  std::timed_mutex mtx1;
  std::timed_mutex cout_mtx;
  int count=0;
};
int main(){
  dosome d;
  const int N1 = 10000;
  const int N2 = 10001;
  std::thread th1(dosome::calc, &d, N1);
  std::thread th2(dosome::calc, &d, N2);
  th1.join();
  th2.join();
}

但是运行起来会出现一顿一顿的情况

此时程序进入了一种活锁的状态,即

线程1锁定mtx1,线程2锁定cout_mtx

线程1试图锁定cout_mtx1失败,进入阻塞,超时后释放mtx1进入下个循环,

线程2试图锁定mtx1失败,进入阻塞,超时后释放cout_mtx进入下个循环

以此类推

防止该情况的发生,需要注意调用次序,注意保证按照顺序获得互斥量

更加方便的方法是使用std::lock()函数或scope_lock类统一管理多个互斥量

lock函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include<iostream>
#include<mutex>
#include<thread>
#include <chrono>
using namespace std::chrono_literals;

class dosome{
public:
  void f1(){
    {
      std::lock(mtx1, cout_mtx);
      std::lock_guard lock1(mtx1, std::adopt_lock);
      std::lock_guard lock2(cout_mtx, std::adopt_lock);
      count++;
      std::cout << "count: "<<count<<std::endl;
    }

  }
  void f2(){
    {
      std::lock(mtx1, cout_mtx);
      std::lock_guard lock1(cout_mtx, std::adopt_lock);
      std::lock_guard lock2(mtx1, std::adopt_lock); 
      count--;
      std::cout << "count: "<<count<<std::endl;
    }
  }
  void calc(int n){
    for (int i = 0;i<n;i++){
      if (n%2){
        f2();
      }
      else{
        f1();
      }
    }
  }
  
private:
  std::mutex mtx1;
  std::mutex cout_mtx;
  int count=0;
};
int main(){
  dosome d;
  const int N1 = 10000;
  const int N2 = 10001;
  std::thread th1(dosome::calc, &d, N1);
  std::thread th2(dosome::calc, &d, N2);
  th1.join();
  th2.join();
}

此时lock函数会接管下面的两个lock_guard,当他们被销毁时,析构函数中会自动释放两个互斥量

即使传入的两个互斥量顺序不同,lock的特殊算法也可避免死锁的发生

scope_lock类

该类更加方便,使用多个类进行初始化

构造时可以自动调用lock函数,析构时自动释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include<iostream>
#include<mutex>
#include<thread>
#include <chrono>
using namespace std::chrono_literals;

class dosome{
public:
  void f1(){
    {
      std::scoped_lock lock(mtx1, cout_mtx);
      count++;
      std::cout << "count: "<<count<<std::endl;
    }

  }
  void f2(){
    {
      std::scoped_lock lock(cout_mtx, mtx1);
      count--;
      std::cout << "count: "<<count<<std::endl;
    }
  }
  void calc(int n){
    for (int i = 0;i<n;i++){
      if (n%2){
        f2();
      }
      else{
        f1();
      }
    }
  }
  
private:
  std::mutex mtx1;
  std::mutex cout_mtx;
  int count=0;
};
int main(){
  dosome d;
  const int N1 = 10000;
  const int N2 = 10001;
  std::thread th1(dosome::calc, &d, N1);
  std::thread th2(dosome::calc, &d, N2);
  th1.join();
  th2.join();
}

读写锁

若存在三个线程读取数据、一个线程写入数据,若使用mutex互斥量进行操作,读写步骤几乎是排队进行的,导致了程序运行效率底下

我们又知道,在没有数据写入的情况下,多个线程同时读取数据是不会发生数据竞争的,不必独占资源

故我们可以规定:

  • 一个线程读取数据时,允许其他线程同时读取数据,不允许其他线程写数据
  • 一个线程写数据时,不允许其他线程读写数据

这样可以显著提高效率

C++14以上,这样的读写锁的功能由shared_mutex类、shared_timed_mutex提供

该类提供两个模式和两个锁:

  • 独占模式:仅有一个线程可以占有互斥量
  • 独占锁:独占模式下,当一个线程获得独占锁,任何其他线程都不能获得独占锁和共享锁
  • 共享模式:可以有多个线程分享互斥量
  • 共享锁:共享模式下,当一个线程获得共享锁,其他线程可获得共享锁,不能获得独占锁

读写锁广泛应用于读多写少的场景下,是一种重要的同步原语

条件变量

条件变量是并发程序中经常用到的同步原语,可以实现在线程间发生通知以实现线程间的同步

工作流

如提供两个线程,用于读取与写入数据,读取要在写入之后

  1. 接收线程获得互斥量mutex,使用条件变量来等待发送线程的通知,等待时释放mutex
  2. 发送线程获得mutex,并对共享数据进行写入,完成后释放mutex
  3. 发送线程使用条件变量发送通知,接收线程接收到通知后重新获得mutex,结束阻塞继续运行
  4. 接收线程读取处理共享数据,处理完毕后释放mutex

使用

标准库conditional_variable实现条件变量原语,包含以下函数:

等待:

  • wait:传入锁的引用以及解除阻塞的条件。进入等待状态时释放锁,结束后获得锁,且都是原子操作
  • wait_for:
  • wait_until:

通知:

  • notify_one:
  • notify_all:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include<iostream>
#include<thread>
#include<mutex>
#include <chrono>
#include <condition_variable>
using namespace std::chrono_literals;
class channel{
public:
  void getData(){
    auto tid = std::this_thread::get_id();
    std::unique_lock lck(mutex_);
    std::cout<<"接收方["<<tid<<"]等待数据"<<std::endl;
    condVar.wait(lck);
    std::cout<<"接收方["<<tid<<"]得到数据"<<sharedData<<std::endl;
    sharedData.clear();
  }
  void setData(){
    static int id = 1;
    std::stringstream ss;
    ss << "hello #"<<id;
    {
      std::unique_lock lck(mutex_);
      sharedData = ss.str();
      std::cout  <<"\n发送方:第"<<id<<"条数据写入完毕"<<std::endl;
      id++;
    }
    condVar.notify_one();
    std::this_thread::sleep_for(1ms);
  }
private:
  std::mutex mutex_;
  std::condition_variable condVar;
  std::string sharedData;
};

int main(){
  channel c;
  std::thread write_th(channel::setData, &c);
  std::thread read_th(channel::getData, &c);

  read_th.join();
  write_th.join();
}

丢失唤醒与虚假唤醒

发现程序运行时,有时会出现死锁的状态

从输出发现,写线程在读线程进入等待前就准备好了数据并发出通知,而读线程进入状态后才

准备接受通知,于是错过了通知,一直等待,发生丢失唤醒

除了丢失唤醒,等待函数还会出现虚假唤醒:即等待线程在条件未满足的情况下被唤醒了

原因与底层硬件软件的异常有关

因此我们最好使用wait函数的重载版本,设置一个等待条件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include<iostream>
#include<thread>
#include<mutex>
#include <chrono>
#include <condition_variable>
using namespace std::chrono_literals;
class channel{
public:
  void getData(){
    auto tid = std::this_thread::get_id();
    std::unique_lock lck(mutex_);
    std::cout<<"接收方["<<tid<<"]等待数据"<<std::endl;
    condVar.wait(lck, [this]{return !sharedData.empty();});
    if (sharedData.empty()){
      std::cout<<"接收方["<<tid<<"] 虚假唤醒"<<std::endl;
    }
    else std::cout<<"接收方["<<tid<<"]得到数据"<<sharedData<<std::endl;
    sharedData.clear();
  }
  void setData(){
    static int id = 1;
    std::stringstream ss;
    ss << "hello #"<<id;
    {
      std::unique_lock lck(mutex_);
      sharedData = ss.str();
      std::cout  <<"\n发送方:第"<<id<<"条数据写入完毕"<<std::endl;
      id++;
    }
    condVar.notify_one();
    std::this_thread::sleep_for(1ms);
  }
private:
  std::mutex mutex_;
  std::condition_variable condVar;
  std::string sharedData;
};

int main(){
  channel c;
  std::thread write_th(channel::setData, &c);
  std::thread read_th(channel::getData, &c);

  read_th.join();
  write_th.join();
}

信号量

实现对有限资源的并发访问控制,内部有一个计数器,表示资源的可用数量

线程通过信号量来获得对资源的访问许可,若大于0,可以进行访问,计数器减一

信号量要与互斥量结合使用,用信号量来限制并发访问数量,用互斥量实现对具体数据同步

C++标准库中,有一个类模板counting_semaphore,其中参数LeastMaxValue规定了计数最大值不小于该值,还有他的特化版本binary_semaphore,值设定为1

函数:

  • release:将计数值+1,是原子操作
  • acquire:若计数值大于1,则将计数值-1,如果为0,则该函数阻塞直到计数值大于1
  • try_acquire:成功时将计数值-1,返回true
  • try_acquire_for:阻塞时最多等待的时间
  • try_acquire_until:阻塞时最多等待到某个时刻

例:循环读写buffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#include <iostream>
#include <thread>
#include <chrono>
#include <random>
#include <array>
#include <semaphore> 
#include <syncstream>

// 简单的随机辅助类
class Random {
public:
    // 构造 [l, r] 的整数随机
    Random(int l, int r)
      : eng(std::random_device{}()),
        dist(l, r)
    {}

    int operator()() {
        return dist(eng);
    }
private:
    std::mt19937 eng;
    std::uniform_int_distribution<int> dist;
};

// RWBuffer 模板
template <size_t BuffSize = 6>
class RWBuffer {
public:
    RWBuffer() {
        buff.fill(' ');    // 填充空格
    }

    // 模拟生产者准备数据
    char prepareData() {
        std::this_thread::sleep_for(std::chrono::milliseconds(rgen()));
        return chargen();
    }

    // 模拟消费者处理数据
    void processData(char /*ch*/) {
        std::this_thread::sleep_for(std::chrono::milliseconds(rgen2()));
    }

    // 原子打印缓存区
    void printBuffer() {
        std::osyncstream sync(std::cout);
        sync << "BUFFER: |";
        for (auto c : buff) {
            sync << c << "|";
        }
        sync << "\n";
    }

    // 写线程函数
    void writeToBuffer(size_t iterations) {

        std::osyncstream(std::cout) << "写线程就绪.\n";
        // 总共写 iterations * BuffSize 次
        for (size_t i = 0; i < iterations * BuffSize; i++) {
            char ch = prepareData();
            sem_writer.acquire();        // 等待有空位
            size_t idx = i % BuffSize;
            buff[idx] = ch;

            std::osyncstream(std::cout) << "写线程:写入 '" << ch << "' 到位置 " << idx << "\t";
            printBuffer();

            sem_reader.release();        // 通知读线程
        }
    }

    // 读线程函数
    void readFromBuffer(size_t iterations) {

        std::osyncstream(std::cout) << "读线程就绪.\n";
        for (size_t i = 0; i < iterations * BuffSize; i++) {
            sem_reader.acquire();        // 等待有数据
            size_t idx = i % BuffSize;
            char ch = buff[idx];
            processData(ch);
            buff[idx] = ' ';

            std::osyncstream(std::cout) << "读线程:读取 '" << ch << "' 从位置 " << idx << "\t";
            printBuffer();

            sem_writer.release();        // 通知写线程
        }
    }

private:
    Random rgen{500,1000};
    Random rgen2{800,1500};
    Random chargen{'A','Z'};
    std::array<char, BuffSize> buff;
    std::counting_semaphore<BuffSize> sem_reader{0};
    std::counting_semaphore<BuffSize> sem_writer{BuffSize};
};

int main(){
    constexpr size_t N = 6;
    constexpr size_t ITERS = 3;

    RWBuffer<N> buffer;

    // 启动写线程和读线程
    std::thread writer(&RWBuffer<N>::writeToBuffer, &buffer, ITERS);
    std::thread reader(&RWBuffer<N>::readFromBuffer, &buffer, ITERS);

    writer.join();
    reader.join();

    return 0;
}

异步任务

将函数调用交给另一个线程异步执行,原有线程继续执行代码,在需要时再去获得执行结果

C++中提供了async与packaged_task进行异步任务

异步

async

语法:

std::future<返回类型> async(std::launch policy, F& f, Args&&... args);

std::future<返回类型> async(F& f, Args&&... args);(默认策略)

执行策略(用整型不同bit位表示):

  • std::launch::async:异步调用,00000001,相当于新建或使用线程池中的线程并在线程中调用函数
  • std::launch::deferred:推迟调用,00000010,async返回的future对象用于存储函数指针、对象、实参拷贝,调用get函数时会同步调用存储的这个函数
  • std::launch::async|std::launch::deferred:组合,00000011,同步还是异步执行根据具体实现决定

packaged_task

方法

  • get_future():获得一个future对象,封装函数的返回结果存储在future对象的共享状态中
  • void operator()(ArgTypes... args):用于调用封装的函数,将返回结果存储在future中,此时会同步执行封装函数
  • reset():用于重复使用packaged_task,清空状态获得一个新的future对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include<iostream>
#include<thread>
#include<random>
#include<vector>
#include<future>
#include<syncstream>
// 简单的随机辅助类
class Random {
public:
    // 构造 [l, r] 的整数随机
    Random(int l, int r)
      : eng(std::random_device{}()),
        dist(l, r)
    {}

    int operator()() {
        return dist(eng);
    }
private:
    std::mt19937 eng;
    std::uniform_int_distribution<int> dist;
};
float Compute(std::vector<float> &v){
  std::this_thread::sleep_for(std::chrono::seconds(2));
  auto r = std::accumulate(v.begin(), v.end(), 0.0f);
  std::osyncstream(std::cout)<<"执行任务的线程id="<<std::this_thread::get_id()<<std::endl;
  return r;
}
int main(){
  const int RN_MAX = 10000;
  Random rgen(0, RN_MAX);
  std::vector<float> numbers(100, 0.0f);
  std::generate(numbers.begin(), numbers.end(), [&]{return float(rgen()/RN_MAX);});
  std::cout <<"创建任务的线程id:"<<std::this_thread::get_id()<<std::endl;
  std::packaged_task<float(std::vector<float>&)> task(Compute);

  std::cout<<"直接调用"<<std::endl;
  std::future<float> result = task.get_future();
  task(numbers);
  std::cout <<"result="<<result.get()<<std::endl;

  task.reset();

  std::cout<<"线程调用"<<std::endl;
  result = task.get_future();
  std::thread t(std::move(task), std::ref(numbers));
  std::cout<<"result="<<result.get()<<std::endl;
  t.join();
  return 0;
}

线程屏障

线程屏障提供了一个计数器,每个参与任务的线程完成自己的任务后计数器-1,而需要同步的线程可以阻塞到计数器为0时再执行后面的代码

latch

latch是一次性的计数,参数为作为计数器的初始值

  • count_down(n=1):将计数器减去传入的数值,默认为1,是原子操作
  • wait():将线程阻塞到计数器为0后释放
  • try_wait():检查计数器是否为0,返回true为0,返回false不一定为0
  • arrive_and_wait(n=1):执行了count_down和wait

e.g.模拟paobu

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include<iostream>
#include<thread>
#include<latch>
#include<random>
#include<chrono>

struct Runner{
  std::string name;
  int time = 0;
  void run(std::latch& start, std::latch& end){
    start.wait(); // 等待发令枪响
    // 模拟跑步过程
    auto start_time = std::chrono::system_clock::now();
    std::mt19937 rng(std::random_device{}());
    std::uniform_int_distribution<unsigned int> uniform_dist(0, 2000);
    std::this_thread::sleep_for(std::chrono::milliseconds(9600+uniform_dist(rng)));
    auto end_time = std::chrono::system_clock::now();
    time = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();
    end.count_down(1); // 跑完,计数器-1
  } 
  bool operator<(const Runner& other) const {
    return time < other.time;
  }
};

int main(){
  std::vector<Runner> runners = {
    {"Alice"}, {"Bob"}, {"Charlie"}, {"Diana"}, {"Eve"}
  };
  // 等待阶段
  const int runner_count = runners.size();
  std::latch start(1); // 代表发令枪,等到为0时执行后面代码
  std::latch finish(runner_count); // 代表完赛选手数量,有一个选手冲线则-1,直到为0时比赛结束
  std::vector<std::thread> threads;
  for (int i = 0; i<runner_count; ++i) {
    threads.emplace_back(&Runner::run, &runners[i], std::ref(start), std::ref(finish));
  }
  std::cout <<"发令枪响"<< std::endl;
  start.count_down(); // 发令枪响,所有选手开始跑

  finish.wait(); // 等待所有选手跑完,阻塞
  // 结束后统计成绩
  std::cout <<"比赛结束"<< std::endl;
  std::sort(runners.begin(), runners.end());
  for (const auto& runner : runners) {
    std::cout << runner.name << " : " << float(runner.time)/1000 << "秒" << std::endl;
  }
  for(auto& thread : threads) {

      thread.join();
  }
}
1
2
3
4
5
6
7
发令枪响
比赛结束
Eve : 9.946秒
Bob : 10.034秒
Diana : 10.092秒
Alice : 10.842秒
Charlie : 11.621秒

barrier

barrier每次计数到0时,都会把初始值恢复,进行下一轮计数

barrier是个类模板,参数为计数器初始值

  • arrive(n=1):将计数器减去指定参数n

  • wait():等待计数器为0

  • arrive_and_wait(n=1):前两个结合

  • arrive_and_drop(n=1):除了将本次-n,还将下一次的初始值-n

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#include<iostream>
#include<thread>
#include<barrier>
#include<random>
#include<chrono>
#include<vector>
class participant {
public:
  std::string name;
  participant(const std::string& name) : name(name) {}
  template <typename BarrierType>
  void run(BarrierType& finish){
    // 随机数确定是否能存货
    std::mt19937 rng(std::random_device{}());
    std::uniform_int_distribution<unsigned int> uniform_dist(1000, 2000);
    for (int i = 0; i<5;i++){
      auto rnd = uniform_dist(rng);
      std::this_thread::sleep_for(std::chrono::milliseconds(rnd));
      alive = rnd%3;
      if (alive){
        finish.arrive_and_wait(); // 到达终点,等待其他选手
      }
      else{
        finish.arrive_and_drop(); // 选手掉队,直接结束
        break;
      }
    }
  }
  bool is_alive() const {
    return alive;
  }
private:
  bool alive = true; // 选手是否存活
};

int main(){
  std::vector<participant> participants = {
    {"Alice"}, {"Bob"}, {"Charlie"}, {"Diana"}, {"Eve"}
  };
  const int participant_count = participants.size();
  int round = 0;
  // barrier每轮结束时的回调函数
  auto on_complete = [&participants, &round](){
    round++;
    std::cout <<"第"<< round << "轮幸存者:"<< std::endl;
    int count = 0;
    for (auto & p : participants) {
      if (p.is_alive()) {
        std::cout << p.name << " ";
        count++;
      }
    }
    if (count==0){
      std::cout << "没有幸存者" << std::endl;
    }
    std::cout << std::endl;

  };
  std::barrier finish(participant_count, on_complete); // 参赛者数量为初始值,oncomplete为每轮的回调
  std::vector<std::thread> threads;
  for (auto &p: participants) {
    threads.emplace_back([&p, &finish](){p.run(finish);});
  }
  for (auto &thread : threads) {
    thread.join(); // 等待所有线程结束
  }
  return 0;
}

注:对计数器进行-n操作时,传入的n的值不能大于当前计数值,否则会导致未定义错误

原子操作

原子操作是不能被分割或中断的操作,其他线程只能看到开始与结束的状态

类型

  • Load 加载
  • Store 存储
  • Read Modify Write 读取-修改-写入(RMW)

atomic_flag

它只有true和false两个值,true表示标志被设置,false表示标志被清除

初始化可以使用ATOMIC_FLAG_INIT宏,也可以使用默认构造函数(C++20后)

  • clear():清除标志,是一个原子写操作,函数有一个参数memory_order,指定内存顺序,默认为memory order sequentially consistent

  • test_and_set():将标志设置为true,并返回原有值,是一个RMW操作

(以下为C++20新增)

  • test():读取数据
  • wait(oldvalue):等待条件达成
  • notify_one():通知其中一个线程结束等待
  • notify_all():通知所有线程结束等待

自旋锁

自旋:持续监控变量以等待发生变化的过程

自旋锁消耗资源较高,常用于临界区代码耗时很少的场合

1
2
3
4
5
6
7
8
9
10
11
class SpinLock{
public:
    void lock(){
        while (flag.test_and_set(std::memory_order_acquire)){}
    }
    void unlock(){
        flag.clear(std::memory_order_release);
    }
private:
    std::atomic_flag flag{ATOMIC_FLAG_INIT};
};

例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include<iostream>
#include<atomic>
#include<thread>
#include<assert.h>

class SpinLock{
public:
    void lock(){
        while (flag.test_and_set(std::memory_order_acquire)){}
    }
    void unlock(){
        flag.clear(std::memory_order_release);
    }
private:
    std::atomic_flag flag{ATOMIC_FLAG_INIT};
};
int main(){
  SpinLock spin;
  int count = 0;
  auto inc = [&](){
    for (int i = 0; i<1000;i++){
      std::lock_guard<SpinLock> lock(spin);
      ++count;
    }
  };
  std::thread t1(inc);
  std::thread t2(inc);
  t1.join();
  t2.join();
  std::cout << "count = " << count << std::endl;
}

这里由于自旋锁包含lock与unlock函数,符合C++中的基本锁类型,故可以使用lock_guard自动加锁解锁

实现了一个一次生产者与消费者的模拟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include<iostream>
#include<atomic>
#include<thread>
#include<assert.h>

struct Channel{
  void getData(){
    std::cout <<"[Consumer] wait for data ready\n";
    dataReady.wait(false); // 等待数据准备好
    std::cout<<"[Consumer] data is:"<<data<<std::endl;
    dataReady.clear();
  }
  void setData(const std::string& v){
    std::this_thread::sleep_for(std::chrono::milliseconds(2000)); // 模拟数据准备时间
    std::cout<< "'[Producer] write data\n";
    data = v;
    dataReady.test_and_set();
    dataReady.notify_one();
  }
private:
  std::atomic_flag dataReady = ATOMIC_FLAG_INIT; // 用于标记数据是否准备好
  std::string data; // 存储数据
  
};

int main(){
  Channel channel;
  std::thread consumer(Channel::getData, &channel);
  std::thread producer(Channel::setData, &channel, "Hello, World!");

  consumer.join();
  producer.join();
}

特化版本

atomic除了主模板还有针对指针、智能指针与数据类型的特化版本

atomic

其中主模板中的T要求是可平凡拷贝的

有无锁的判断

我们可以通过atomic模板提供的is_lock_free()函数判断有无锁,若无锁(通过CPU的原子指令)则函数返回true,否则返回false

  • 基本类型和指针都是无锁的
  • 自定义类型与类型大小与对齐有关:类型大小是\(2^n\)且小于最大对齐数(16字节)

主要的原子操作

加载操作:用于原子地获得当前变量的值

1
2
T load(std::memory_order order = std::memory_order_seq_cst) const noexcept;
operator T() const noexcept;

存储操作:用于将变量原子地修改为要更新的值

1
2
void store(T desired, std::memory_order order = std::memory_order_seq_cst) noexcept;
T operator=(T desired) noexcept;

CAS(比较与交换函数):

通过compare_exchange函数完成

1
2
bool compare_exchange_weak(T& expected, T desired, std::memory_order success, std::memory_order failure) noexcept;
bool compare_exchange_strong(T& expected, T desired, std::memory_order success, std::memory_order failure) noexcept;

符合条件的CAS会直接使用CPU硬件指令来实现

其中weak版可能有虚假失败的情况

例:CAS循环模式

CAS是实现无锁算法的重要方式,经常使用CAS循环实现对原子值更改

1
2
3
4
5
6
7
8
9
int fetch_multiply(std::atomic<int> &value, int multiplier){
    int oldValue = value.load();
    int desire;
    do{
        desired = oldValue*multiplier;
    }while(!value.compare_exchange_strong(oldValuem desired));
    
    return oldValue;
}

原理:加载当前值到oldValue,使用其来计算乘积并存储到本地变量desire,调用compare_exchange以原子方式更新value

若原value等于oldValue,说明没有并发线程修改过value,则更新为desired

若不等与则说明value被其他线程修改,那么将oldValue的值更新为当前value重试

例:使用atomic和CAS实现无锁堆栈

在多线程模式下,将新节点的next指向原节点与将head节点指向新节点是两个独立操作,其中第二步的前提是链表头部节点没有变化

此时另一个线程的操作可能会导致链表头部发生变化

例1

如此时节点A向堆栈push了一个新节点A

例2

此时另一个线程又向堆栈push了一个新节点B,head指向了B

若第一个线程还将head更新为节点A,则会将节点B孤立

例3

此时我们在compare_exchange函数中将A与B节点的next节点进行比较,若相等,说明头部没有变化,那么可以将A作为新的节点,将head指向A

否则说明已经发生变化,就将A的next值更新为head当前值,并更新head

pop操作更简单,将head当前值存储在局部变量中,再把其next与当前值进行比较,若相等则说明头部未改变,将head更新为头节点next值,否则说明发生变化,将局部变量更新为head当前值,继续循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#include<iostream>
#include<atomic>
#include<thread>
#include<memory>
#include<syncstream>

template<typename T>
class LockFreeStack{
private:
  struct Node{
    T data;
    Node* next = nullptr;
    Node(T d):data(d){}
  };
  std::atomic<Node*> head;
public:
  LockFreeStack() = default;
  LockFreeStack(const LockFreeStack&) = delete;
  LockFreeStack& operator=(const LockFreeStack&) = delete;

  void push(T val){
    auto *newNode = new Node(val);
    //std::shared_ptr<Node> newNode = std::make_shared<Node>(val);
    newNode->next = head.load();
    while(!head.compare_exchange_strong(newNode->next, newNode));
    std::osyncstream(std::cout) <<" Pushed: " << val << "\n";
  }

  bool pop(T &result){
    Node* oldHead = head.load();
    do{
      if (oldHead==nullptr){
        std::osyncstream(std::cout)<< " Stack is empty, cannot pop.\n";
        return false;
      }
    }while(oldHead&&!head.compare_exchange_strong(oldHead, oldHead->next));
    if (oldHead){
      result = oldHead->data;
      std::osyncstream(std::cout)<< " Popped: " << result << "\n";
      return true;
    }
    else return false;
  }
};

void pushItems(LockFreeStack<int> &stack, int start, int end){
  for (int i = start; i<end;i++){
    stack.push(i);
    std::this_thread::sleep_for(std::chrono::milliseconds(50));
  }
}
void popItems(LockFreeStack<int> &stack, int count){
  int value;
  for (int i = 0; i<count;i++){
    stack.pop(value);
    std::this_thread::sleep_for(std::chrono::milliseconds(30));
  }
}

int main(){
  LockFreeStack<int> stack;

  std::thread t1(pushItems, std::ref(stack), 1, 10);
  std::thread t2(popItems, std::ref(stack),7);
  std::thread t3(pushItems, std::ref(stack), 10, 20);
  std::thread t4(popItems, std::ref(stack),7);

  t1.join();
  t2.join();
  t3.join();
  t4.join();
  return 0;
}

(这里shared_ptr不是平凡可复制类,故改为了裸指针,同时无法解决ABA问题)

内存模型

顺序一致性模型

程序按照代码编写顺序执行,并且所有线程都看到相同顺序的模型

枚举值名称为memory_order_seq_cst

该模型顺序简单,不易出错,但可能无法充分使用系统和硬件的性能优化

Relaxed模型

relaxed模型下,不对内存顺序作约束,可能出现顺序一致性模型中不会出现的结果

CPU和内存之间会有多级缓存

缓存、编译器的优化都可能改变同一线程不同语句的执行顺序

枚举值名称为memory_order_relaxed

relaxed模型中,线程执行代码对值的更改首先保存在缓存中,当代码执行完毕,才会更新内存中的值

relaxed

如该例子中,线程1与2执行了①和③代码,将缓存中的a与b修改为1,再执行②和④代码,读取内存中的a与b都为0

在代码执行完毕后,缓存中的值才会修改到内存中

Acquire/Release模型

枚举值有如下三种:

  • memory_order_acquire
  • memory_order_release
  • memory_order_acq_rel

通常在原子操作中,

写入数据使用release,即将值写入后,将缓存中的数据发布到内存中,包括之前修改过的其他变量

获取数据使用acquire,即将值从内存中更新到缓存,并读取这个值

对于先加载在修改的,使用acq_rel

在mutex锁中,mutex.lock()与acquire语义是等价的,mutex.unlock()与release语义是等价的,两者之间是临界区

临界区内的代码可以互相调换顺序,但不能移出临界区外

临界区上方的代码可以进入临界区,但不能移出临界区下方

临界区下方的代码可以进入临界区,但不能移出临界区上方

于此对应的memory_order_acquirememory_order_release遵循同样的原则