现代C++并发编程基础

本文最后更新于:August 26, 2022 am

这篇博客为笔者学习了大佬Arthur O’Dwye做的关于C++并发库的演讲后写的一些记录,大佬演讲的视频和slides链接放在末尾

由于精力有限,只记录了一些使用重点,复习了相应操作系统的概念,暂时没有对库进行深入研究。另外,这里介绍的只是并发编程基础,不要在此基础上直接构建大型项目,应该转而使用更高一层抽象的库,比如ASIO等

并发和并行

并发(concurrency)

强调一起(together),举例而言,假如:

你(cpu)在一边做slides(任务1),一边写邮件(任务2)。做slides和写邮件对应两个任务,但是你不能在同一个时刻做两个事情,在一个时刻上,你要么在做slides,要么在写邮件,不能两者兼有。

可以推知,单核单线程cpu上的分时操作系统一定只支持并发

并行(parallelism)

强调同时(parallel),举例而言:

你(cpu)在一边做slides(任务1),一边听音乐(任务2)。

做slides和听音乐是两个任务,并且在同一时刻两个任务都在进行。

那么单核多线程/多核多线程的cpu就可以支持并行。

关系

以下为个人理解:

多cpu的情况下,如果把$\set{task_{a1},taks_{a2}, \cdots,task_{an}}$分配给cpu1,把$\set{task_{b1},taks_{b2}, \cdots,task_{bn}}$分配给cpu2,那么第一个集合里的任务和第二个集合里的任务就是并行(parallelism),同一个集合里的任务关系就是并发(concurrency)

C++为什么关心并发

C++11之前,ISO 标准里没有线程的概念。

编译器重写

假设有这么一段代码:

1
2
3
4
5
6
int x=0;
x=1;
sleep(100ms);
x=2;
sleep(100ms);
x=3;

如果编译器将之重写为以下代码,会不会有问题?

1
2
int x=3;
sleep(200mx);

C++11之前,没有答案

C++11的回答:毫无模糊的——可以

因为当此线程正在修改x变量时,不允许其它线程来访问变量x;同时,并且没有做同步操作的情况下,不能保证此线程对变量x的修改和其它线程对变量x的访问不在同一时刻发生。

硬件重写

1
2
3
4
char a[1000]={};
a[0]=1;
a[100]=2;
a[1]=3;

以上的代码可能被硬件(非编译器)直接重写为[1]

1
2
3
4
5
6
7
8
9
cacheLine1 = a[0..63];
cacheLine1[0]=1;

cacheLine2=a[64...127];
cacheLine2[35]=2;
cacheLine1[1]=3;

a[0...63] = cacheLine1;
a[64...127] = cacheLine2;

C++11

  • 程序由一个或者多个可执行的线程组成
  • 每次某个线程向某个内存地址写入时,如果其它线程有可能读取或者写入此地址,那么这个操作必须被同步
  • 同步可以通过std提供的一系列工具实现, std::mutex , std::atomic

创建一个线程

1
2
3
4
5
std::thread threadA = std::thread tb([](){
puts("Hello from threadA");
});
puts("Hello from threadB");
tb.join();

创建次线程对象之后,它立刻开始“执行”(可以理解为进入就绪队列),也就是说没有std::thread::start()这种方法可调用

当线程的事情做完以后,它变得无事可做,此时它的状态是”joinable”。为什么要用join,可以把创建线程的时刻想象为树枝的分叉,在join函数调用之后新的分叉又回到了原来的分叉

如果join方法对应的的线程对象还没有做完工作,调用此方法的线程会被阻塞。上面的例子中,主线程可能会被阻塞

获取线程执行的结果

线程没有返回值,我们通常使用lambda表达式来交换数据

1
2
3
4
5
6
7
8
int result=0;
std::thread trd = std::thread tb([&](){
puts("Hello from threadA");
result=42;
});
puts("Hello from threadB");
tb.join();
std::cout<<"The value of result is "<<result;

此处我们在第八行直接打印了result的值,没有额外的同步操作,因为join函数本身就是同步的,在第八行只有可能有主线程,子线程已经退出了,所以也不会有数据竞争(data race)

数据竞争的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
using SC = std::chrono::steady_clock;
auto deadline= SC::now() + std::chrono::seconds(10);

int counter=0;
std::thread threadB = std::thread([&](){
while (SC::now() < deadline)
printf("B: %d\n",++counter);
});

while(SC::now() < deadline)
printf("A: %d\n",++counter);

threadB.join();

这段代码中,第七行和第十一行会引起数据竞争。不论这两个线程是并发还是并行,都会有数据竞争

在标准里规定,对于任何类型的数据而言,只要出现了数据竞争,那就是未定义行为。

使用 std::atomic

1
2
3
4
5
6
7
8
9
10
11
12
13
using SC = std::chrono::steady_clock;
auto deadline= SC::now() + std::chrono::seconds(10);

std::atomic<int> counter=0;
std::thread threadB = std::thread([&](){
while (SC::now() < deadline)
printf("B: %d\n",++counter);
});

while(SC::now() < deadline)
printf("A: %d\n",++counter);

threadB.join();

std::atomic的类型参数可以使任何原始数据类型(int, float, int*…)

标准中规定,所有对同一个atomic对象的操作都会被自动同步

注意

这种修改仅仅修复了物理层面的数据竞争,这段代码仍然有语义上的数据竞争,因为这段代码的执行结果是不可预测的,这取决于实际的线程调度。这可以算一个BUG,但是确实不是未定义行为

逻辑同步操作

上文提到了没有std::thread::start()方法, 当我们希望在创建线程对象和线程真正开始干活之间加上一些间隔时,如何操作呢?

如果使用C++的同步原语(primitive),代码应该写成这样:

1
2
3
4
5
6
7
8
std::thread threadB = std::thread([&](){
waitUntilUnblocked();
printf("Hellp from B\n");
});
printf("Hello from A\n");
unblockThreadB();
threadB.join();
printf("Hello again from A\n");

什么是原语呢?可以将至理解为一种更高层次的抽象,它定义了一种操作和这种操作导致的效果,但是不规定具体实现的细节。比如,标准库可能会提供10个工具来帮助我们实现函数waitUntilUnblocked()函数的效果,每个工具的实现各有千秋,但都能达到同样的结果。我们此处用原语统一概括之。

糟糕的实践: 忙等待

1
2
3
4
5
6
7
8
9
std::atomic<bool> ready=false;
std::thread threadB = std::thread([&](){
while(!ready){}
printf("Hello from B\n");
});
printf("Hello from A\n");
ready=true;
threadB.join();
printf("Hello again from A\n");

这段代码中,采取忙等待的方式。我们用std::atomic的方式把ready包起来了,所以不会有数据竞争,但是这样做仍然十分糟糕:

  1. 这里不是在等待,而是在自旋。 线程B一直在检查ready变量的值,直到它变为true才开始干活。检查变量的值这种操作也是指令,就会占用CPU的时间,这是一种比较严重的资源浪费

  2. 虽然没有数据竞争,仍然有可能引起未定义行为

    编译器发现线程B的工作并不可能直接影响ready变量的值。既然如此,编译器可能在执行开始时就将ready的值拉到寄存器里,然后用寄存器开始测试。编译器可能让线程B跳出循环,或者一直检查。

最简单的方法 std::mutex

调用std::mutex::lock()时,如果此互斥锁已经被某个线程锁过,但它还没有解锁,那么调用lock方法的线程会被阻塞。如果其没有被锁过或者已经被上一个线程解锁,那么此线程会正常执行。

1
2
3
4
5
6
7
8
9
10
std::mutex mtx;
mtx.lock();
std::thread threadB = std::thread([&](){
mtx.lock();mtx.unlock();
printf("Hello from B\n");
});
printf("Hello from A\n");
mtx.unlock();
threadB.join();
printf("Hello again from A\n");

这是一种解决方案,但是并不合适,因为std::mutex通常是用来做数据保护的,参考下面一段代码

1
2
3
4
5
6
7
8
9
10
11
12
class Tokenpool {
std: mutex mtx_;
std: vector< Token> tokens_;
Token gettoken(){
mtx_.lock();
if (tokens_.empty())
tokens_.push_back(Token::create()),
Token t=std: move(tokens_.back());
tokens_.pop back ();
mtx_.unlock();
return t;
};

因为vector是线程不安全的,我们需要手动同步对同一个vector变量的操作

数据保护必须完全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Tokenpool {
std: mutex mtx_;
std: vector< Token> tokens_;
Token gettoken(){
mtx_.lock();
if (tokens_.empty())
tokens_.push_back(Token::create()),
Token t=std: move(tokens_.back());
tokens_.pop back ();
mtx_.unlock();
return t;
}
size_t numTokensAvailable()const{
return tokens_.size();
}
};

如果仅仅在gettoken函数中进行保护,而对于numTokenAvailable函数不进行保护,那么会出现这样的情况:两个线程分别调用两个函数,可能会出现size和实际情况不一致

因此,需要做到对数据的100%保护。当然,在构造函数和析构函数里,可以不上锁,因为不会有两个个线程同时对一个对象执行构造或者析构

异常安全: std::lock_guard

回看上面的代码,如果在获取锁之后,但释放锁之前,push_back抛出了异常,那么锁就永远无法释放了。使用std::lock_guard<std::mutex>进行RAII地获取和释放,其构造函数将传入的互斥锁锁上,析构函数自动释放锁。

1
2
3
4
std::lock_gurad<std::mutex> lk(mtx_);

//after C++17, there is class template argument dedunction
std::lock_guard lx(mtx_);

互斥锁亦是一种资源

就像对于堆上分配的内存,我们有std::unique_ptr,对于互斥锁我们也提供了std::unique_lock

std::lock_guard不提供任何关于解锁的方法,它只会在其析构时解锁,如果你需要暂时解锁并重新枷锁,就得考虑使用std::unique_lock

std::unique_ptr相似,std::unique_lock不可拷贝,只可移动

1
2
3
4
5
6
7
8
9
10
11
12
13
unique_ptr<int> foo(uniue_ptr<int> p){
if(some condition){
p = nullptr;
}
return p;
}

unique_lock<mutex> foo(unique_lock<mutex> lk){
if(some condition){
lk.unlock();
}
return lk;
}

std::scoped_lock

C++17以后, 应该使用std::scoped_lock而不是std::lock_guard

在构造函数中,可以传入任意多个mutexstd::scoped_lock会同时将它们全都上锁,并且自动避免死锁

1
2
3
4
5
6
7
8
9
10
size_t numTokensAvailable()const{
std::scoped_lock lk(mtx_); // scoped_lock<mutex>
return tokens_.size();
}

void mergeTokensFrom(TokenPool& rhs){
std::scoped_lock lk(mtx_,rhs.mtx_); //scoped_lock<mutex,mutex>
tokens_.insert(rhs.tokens_.begin(),
rhs.tokens_.end());
}

条件变量

如果我们的TokenPool只能使用已经存在的Token,而不能自行新建Token塞到池子里,当池子里的Token用完了,仍有新的线程来索取Token时,需要用到条件变量。

这也是一个典型的生产者消费者问题

cv_.wait(lk)时,条件变量将原子地“将传入互斥锁解锁然后阻塞此线程”,直到某个其它线程对此条件变量调用notify系列方法以唤醒一个或者多个线程,当某个阻塞在此条件变量上的线程醒来时,会尝试重新获取锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
struct TokenPool{
std::vector<Token> tokens_;
std::mutex;
std::condition_variable cv_;

void returnToken(Token t){
std::unique_lock lk(mt_);
tokens_.push_back(t);
lk.unlock();
//因为要手动解锁,所以使用了unique_lock
cv_.notify_one();
}
Token getToken(){
std::unique_lock lk(mtx_);
while (tokens_.empty()){
cv_.wait(lk);
}
Token t=std::move(tokens_.back());
tokens_pop_back();
return t;
}
}

总结

如果在一个生产者消费者问题中,重复出现生产、消费的行为,应该使用互斥量和条件变量。如果这种过程只出现一次,使用std::futurestd::promise即可

静态变量初始化

线程t1t2哪一个会对变量obj进行初始化?当一个线程在进行初始化时,另一个线程在干什么呢?

1
2
3
4
5
6
7
8
int main(){
std::thread t1(foo),t2(foo);
t1.join();t2.join();
}
void foo(){
static ComplicatedObject obj("some","data");
std::cout<<"Hello from foo! obj.x is "<<obj.x<<"\n";
}

结论是,语言本身会保证只有一个线程执行初始化,在此期间其它访问此静态变量的线程会阻塞

单例模式

在C++11以前,为了让一个类线程安全地实现单例模式,你往往需要进行double check

但是在C++11之后,这就变得简单得多了

1
2
3
4
inline Singleton&  Singleton::getInstance(){
static SingletonFoo instance;
return instance;
}

std::call_once

如果你需要更灵活的一种方式实现让很多个线程至多做一件事一次,可以使用std::call_once

上面的单例模式的方式,有它的局限性,它不能作用于某个类的成员变量上,或者如果你希望根据一些运行时的条件,在函数A和函数B中选一个执行,但是只执行一次,使用std::call_once就比较合适

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Logger{
std::once_flag once_;
std::optional<NetworkConnection> conn_;
NetworkConnection& getConn(){
std::call_once(once_,[](){
conn_ = NetworkConnection(defaultHost);
});
return *conn_;
}
};//对类的成员变量进行操作

class X {
std::once_flag once;

void doSomething() {
std::call_once(once, []{ /* init ...*/ });
// ...
}

void doSomethingElse() {
std::call_once(once, []{ /*alternative init ...*/ });
// ...
}
};

C++11原语对比

读写锁

C++17 之后引入了std::shared_mutex,适用于读写锁的场景。

信号量

1
2
3
4
5
constexpr unsigned int max_count=100;
std::counting_semaphore<max_count> count={100};// initialize to 100

count.acquire() <-> semaphore--;
count.release() <-> semaphore++;

std::latch

std::latch可以将一定数量的线程阻塞,等到被阻塞的线程达到预设数量之后,统一放行

1
2
3
4
5
6
constexpr unsigned int num_of_thread_to_wait=2;
std::latch myLatch(num_of_thread_to_wait);

myLatch.arrive_and_wait(); //block if no enough threads have arrived
myLatch.arrive();//simply add the counter but don't block
myLatch.wait();//simply wait but don't increment the counter

std::barrier

std::barrier的功能和std::latch相似,但是它的计数器可以被重置,因此可以重复使用。

C++20原语对比

std::promise and std::future

对只生产消费一次的生产者/消费者问题,使用promise和future

1
2
3
4
5
6
7
8
9
10
11
12
std::future<int> f1= std::async([](){
puts("Hello from thread A!");
return 1;
});
//std::async will create a thread and starts executing immediately.
std::future<int> f2= std::async([](){
puts("Hello from thread B!");
return 2; // a thread don't have a return value. this return value goes into std::future
});

int sum=f1.get()+f2.get();
//block if the futures haven't gotten their result

数据共享准则

  • 使用互斥量保护共享数据

    • 不论读还是写,都必须保护
    • 在合适的场景使用合适的互斥量,std::mutex, std::shared_mutex
  • 生产者消费者问题适合使用互斥量+条件变量

  • 根本的解决方案:不要在线程之间共享数据

    • 让数据类型immutable

    • 使用时生成一份自己的拷贝,修改完成之后用同步的方法将新版本和旧版本merge

结论

  • 未被保护的数据竞争是未定义行为

    • 使用互斥量保护共享数据,包括读和写
  • 需要单例模式时,使用线程安全的静态变量初始化即可

    • 必要时使用std::call_once
  • 互斥量+条件变量非常实用

  • C++20 新增了计数原语

参考

video: Back to Basics: Concurrency - Arthur O’Dwyer - CppCon 2020 - YouTube

slides: CppCon2020/back_to_basics_concurrency__arthur_odwyer__cppcon_2020.pdf at main · CppCon/CppCon2020 (github.com)


  1. 大佬的slides此处似乎有一些问题,加上本人硬件基础不够,不太能理解

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!