0%

并行编程

参考资料

《C++ Concurrency In Action》
《Concurrency With Modern CPP》

并发和并行的概念

在一般的理念中,并发是指 多个thread 请求同一个CPU资源,需要来回切换context;并行是指 每个thread 跑在单独的CPU核上。

在我的工作实践和理解中, 并行是指充分利用CPU多核优势,将代码分解成可以同时运行的模块,当然这里可以同时运行,并不意味着线程间完全没有交互或者耦合,实际工作中我遇到的基本上线程间还是要访问共享内存的,很少遇到那种[0,10e8]相加这种可以完全没有耦合的情况。

并行可以分成两种:

  1. 数据并行
    数据并行是指 可以将数据切分,然后每个部分同时执行相同的逻辑,最终合并结果就可以了。 这是非常完美符合并行概念的。
  2. 业务并行
    业务并行,就是将业务切分,每个线程执行其中一个业务,但每个业务中数据是全程参与的,比如业务分为A B C D,raw_data 进入A, 出来data1 进入B,出来data2 进入C, 出来data3 进入D。 当然这里的raw_data必须是流式的,否则不如单线程 顺序处理所有业务。

是否使用并发(并发的优缺点)

并发并不一定比非并发要好,要看具体的使用场景。

业务分析

分析一段业务逻辑是否可以采用多线程,至于能否充分利用硬件并行的概念再说(我的理解是,将简单的任务切分成子任务 增加的负担 远远超过了带来的益处,负担有可能是代码复杂度的提升,减少的时间远远不能弥补,子任务切分的太多,远远超过了硬件所支持的最优线程数,导致context切换负担太重)。

总之,我们为了性能而使用并发:它可以大幅度提高应用的性能,但也很大可能让代码更加复杂,难以理解,并且更容易出错。因此,应用中只有特别影响性能的部分,我们才值得尝试优化,一句话,一切为了性能。

并发的优点

Todo

并发的缺点

Todo

C++线程库概述

C++11开始提供thread库,一直以来大家都有顾虑,就是抽象的代价高不高?

从网上的资料看,标准委员会在设计线程库时,就力求实现相同功能的情况下,高级API和底层API的性能收益相当。而且很多情况下,使用底层API带来的一点点便利,也会带来更高的代码复杂度和出错率,这样是不划算的。即便是瓶颈出现在C++标准库的工具中,也可能由低劣的程序设计造成。例如,如果过多的线程竞争一个互斥单元,将会很明显的影响性能。与其在互斥操作上耗费时间,不如重新设计,减少互斥单元上的竞争。

如果系统API提供了一些线程库没有的功能,那么还可以通过native_handle获取句柄 来调用系统 API。

线程的创建启动和执行

1 thread只支持move constructor && move assignment operator, 不支持拷贝构造和左值引用的operator= 重载。
2 thread需要在程序结束前调用join or detach,调用前要先判断joinable
3 如果发生异常,如何保证一定会调用join or detach呢? C++20有jthread来解决这个问题,[C++11, C++17] 的 版本需要自己控制,比如thread_guard这种。
4 thread构造时 传递的参数是 传值方式,参数被被拷贝到新线程的内存空间(如同临时变量),即使函数中的参数是引用的形式,拷贝操作也会执行。

1
2
void f(int i, std::string const& s);
std::thread t(f, 3, "hello");

另外,传递参数 有可能会在线程 detach之前 __失去意义__, 导致未定义的行为。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

//wrong
void f(int i,std::string const& s);
void oops(int some_param)
{
char buffer[1024]; // 1
sprintf(buffer, "%i",some_param);
std::thread t(f,3,buffer); // 只复制了buffer指针
t.detach();
}


//right
void f(int i,std::string const& s);
void not_oops(int some_param)
{
char buffer[1024];
sprintf(buffer,"%i",some_param);
std::thread t(f,3,std::string(buffer)); // 使用std::string,避免悬空指针,
t.detach();
}

如果thread proc要求的参数是非常量引用呢

1
2
3
4
5
6
7
8
9
10
void update_data_for_widget(widget_id w,widget_data& data); // 1
void oops_again(widget_id w)
{
widget_data data;
std::thread t(update_data_for_widget,w,data); //wrong
std::thread t(update_data_for_widget,w,std::ref(data)); //right
display_status();
t.join();
process_widget_data(data);
}

类似于unique_ptr, 线程的所有权 可以在std::thread实例中转移,这依赖于std::thread的 moveable && uncopyable。 不可复制的含义是—— 在某一个时间点,一个thread
实例只能关联一个执行线程。

所有权转移

C++ 标准库中有很多资源占有(resource owning)类型,比如ifstream unique_ptr还有std::thread, 都是可移动,不可复制。

所有权转移之后,thread实例就不能调用join 或者 detach。

这里有scoped_thread和jthread,可以参考,也就是说 我们在谈转移权的时候,原先的线程 必须满足joinable()。

1
2
3
4
void f();
thread t(f);
t.detach();
thread x = std::move(t);

对x来讲, 此时joinable必然是false,好像也没有什么影响,那就是析构的时候,也判断joinable,来决定是否detach或者join。

从代码上看,scoped_thread 和 jthread 在这一块的判断不同, jthread是在几乎在所有成员函数中 都判断了joinable,但区别是,jthread在构造的时候 没有判断t是否joinable,而scoped_thread在构造中判断如果joinable为false,
那么直接抛出异常,这就比较奇怪了。

确定线程数量以及简单的计算一个范围内结果的小例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
template<typename Iterator,typename T>
struct accumulate_block
{
void operator()(Iterator first,Iterator last,T& result)
{
result=std::accumulate(first,last,result);
}
};

template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long const length=std::distance(first,last);

if(!length) // 1
return init;

unsigned long const min_per_thread=25;
unsigned long const max_threads=
(length+min_per_thread-1)/min_per_thread; // 2

unsigned long const hardware_threads=
std::thread::hardware_concurrency();

unsigned long const num_threads= // 3
std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);

unsigned long const block_size=length/num_threads; // 4

std::vector<T> results(num_threads);
std::vector<std::thread> threads(num_threads-1); // 5

Iterator block_start=first;
for(unsigned long i=0; i < (num_threads-1); ++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size); // 6
threads[i]=std::thread( // 7
accumulate_block<Iterator,T>(),
block_start,block_end,std::ref(results[i]));
block_start=block_end; // 8
}
accumulate_block<Iterator,T>()(
block_start,last,results[num_threads-1]); // 9

for (auto& entry : threads)
entry.join(); // 10

return std::accumulate(results.begin(),results.end(),init); // 11
}

这就是纯粹的数据并行化概念,把数据分成几份,每份数据都执行相同的计算过程。
当然,为了得到最终结果,必须等待所有线程执行完毕。

在我们通过GL刷新图像的时候,经常会遇到一个现象,有时候刷新率特别低,就会出现 屏幕的不同部分 的刷新频率不同,导致画面的不同部分 不同步。

线程标识

std::thread::get_id() 可以获取获取 线程ID,当然native_handle() 也能获取 handle,这里的handlle 是系统层级的handle,可以传入系统API使用。
而ID是纯标识,不同线程的ID不会相同, 但理论上handle应该也不会相同,那直接比较handle也可以? 如果也可以的话,好像ID 有点多余。

共享数据

共享数据也是没有办法的事情, 线程间如果有业务逻辑的耦合,就要有数据交换,除了socket,我能想到的方式 都涉及到 共享数据(全局变量 )。

共享数据的难点 在于 read-write,当有至少一个线程在修改共享数据的时候, 就要解决同步问题。读的时候要锁定数据不被修改(但是可以允许其他线程读,不知道读写锁 是不是为了解决这个问题), 写的时候更要锁定。

如何避免恶性数据竞争(race condition)

  • 某种保护机制
  • 无锁编程
  • software transactional memory(STL, 软件事物内存)

在《C++ Concurrency in Action》 中提到 无锁编程 “无论是内存模型上的细微差异,还是线程访问数据的能力,都会让工作量变的很大,而且这种方式也很难得到正确的结果”。STM类似于数据库里的事物操作,但C++中没有对STM直接支持,
而且这也是一个很热门的理论研究领域,暂时就不做讨论。

使用互斥量

mutex是经常会用到的一种保护机制,直接锁上就可以了,但存在两个问题

  • 多个锁,有可能导致死锁(dead lock)
  • 锁定过多,导致效率低下,耗时增加;锁定过少,没有保护好数据

std::mutext有lock unlock接口,不建议直接调用成员函数,而是利用RAII机制,使用标准库提供的lock_guard。

1
2
注:C++17 添加了新特性 模板类参数推导 lock_guard可以省略参数 lock_guard guard(some_mutex), C++17还提供了一种加强版数据保护机制scoped_lock

那么对代码加锁就可以了吗?如果函数返回的是 被保护数据的指针或者引用,就会破坏数据,所以要对接口谨慎设计,要确保mutex能锁住数据访问,不留后门。

接口中的条件竞争

以std::stack的实现举例。

stack需要支持的操作 构造、swap、push、pop、top、empty、size

empty size top是读操作,push pop swap是写操作。

如果不加锁,empty size top返回的结果可能在返回时是正确的,但并不可靠。如果同时有多个线程push pop swap也会出现问题。

前面我们提到了 返回数据 导致 接口内部加锁 失效的问题, 那么我们将接口返回值 都改成 返回 数据的拷贝 这样能解决问题吗? 不能。对size empty来说,返回值当时是正确的,但接下来进行pop push top 并不能依靠上面size emtpy返回的结果,除非对这一整段代码都加锁。
类似

1
2
3
4
5
6
7
8
9
10
11
12
stack s;
void do_something(){
lock_guard lck(some_mutex);
if(!s.empty()){
if(s.size() >= 10){
//do some work
}else{
//do some work
}
}else{
//do some work
}

所以不仅仅是接口内部的问题,是接口本身所实现功能的问题。

下面我们以pop为例解释下接口竞争以及相应的解决方案。

1
2
3
stack<int> s;
s::empty();
s::pop();

上面的代码中,先调用empty判断是否为空,不为空的话,就pop栈顶元素。很明显,如果不在整段代码加锁的话,在我们执行pop的时候,很可能其它线程操作导致stack为空了,此时pop该怎么操作呢?抛出异常是一个选项,但这样empty本身就没有意义了,反正为空pop就抛出异常,那么我们也不需要再check empty了。

于是设计者想出了top,top为空就抛出异常,这样empty为空的情况下也要抛出异常,这让empty变成了一个无用的函数。

比如stack< vector >,我们在返回的时候会调用构造函数,但如果构造函数崩溃(系统资源不足或者其它原因),抛出异常,这时候返回失败了,不确定stack上的元素是否已经被移除了。(因为我不确定,如果抛出bad_alloc异常,s.pop()是否还会返回?返回的话,stack已经确实将数据移除了,这时候的情况是:stack 元素确实被移除了,但数据没有正常返回,因为构造失败)。

top确实能解决上面的问题,但引入了新的接口竞争。

当然,问题也有解决方案,但都有相应的代价。

选项1:传引用

1
2
std::vector<int> result;
some_stack.pop(result);

传引用的问题是,在哪里分配空间,有人说引用肯定是在函数调用外部了。对,基本类型是这样的没错,但如果是指针呢? 如果是自定义类型呢? 看下面的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void f1(vector<int>& result) {
result.push_back(10);
}


void f2(vector<vector<int>>& result) {
vector<int> temp;
for (int i = 0; i < 100; ++i) {
temp.push_back(i);
}
result.push_back(temp);
}


void f3(vector<vector<int>*>& result) {
vector<int> temp;
for (int i = 0; i < 1000000; ++i) {
temp.push_back(i);
}
result.push_back(&temp);//离开作用域的时候,temp会析构,这样虽然temp里添加了一个元素,但这个元素size为0
}

int main(int argc, char* argv[]) {

vector<int> r1;
vector<vector<int>> r2;
vector<vector<int>*> r3;
f1(r1);
f2(r2);
f3(r3);
return 0;
}

当然传引用的时候肯定都会遇到这样的问题,如果引用本身是复杂类型,那么还要关注引用内部是否有分配到子函数call stack上的变量,这样离开作用域就会析构。而r3在离开作用域的时候又要析构子函数。

书上说

1
这种方式还不错,缺点也很明显:需要构造出一个栈中类型的实例,用于接收目标值。对于一些类型,这样做是不现实的,因为临时构造一个实例,从时间和资源的角度上来看都不划算。对于其他的类型,这样也不总行得通,因为构造函数需要的参数,在这个阶段不一定可用。最后,需要可赋值的存储类型,这是一个重大限制:即使支持移动构造,甚至是拷贝构造(从而允许返回一个值),很多用户自定义类型可能都不支持赋值操作。

最开始这里没理解,为什么要构造一个栈中类型的实例,以为是在子函数中构造。实际上是这样的,一般情况下,我们认为弹出元素最好是move,这样就不需要再构造。

选项2:无异常抛出的拷贝构造函数或者移动构造函数

移动构造不会抛出异常,所以对pop来讲,如果确实要返回值的版本,那么直接返回右值或者右值引用肯定是最合适的,移动构造也不会抛出异常。

这一点没办法确保。移动构造函数当然好,还是有一些类(无论是历史遗留还是其它原因),没有移动构造函数,而拷贝构造函数此时如果涉及到某些操作,确实是需要抛出 异常的。

选项3:返回指向弹出值的指针

这时候要进行内存管理,最好返回shared_ptr。

选项4:“选项1 + 选项2” 或 “选项1 + 选项3”

例子: 定义线程安全的堆栈

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include <exception>
#include <memory>
#include <mutex>
#include <stack>

struct empty_stack: std::exception
{
const char* what() const throw() {
return "empty stack!";
};
};

template<typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;

public:
threadsafe_stack()
: data(std::stack<T>()){}

threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
data = other.data; // 1 在构造函数体中的执行拷贝
}

threadsafe_stack& operator=(const threadsafe_stack&) = delete;

void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(new_value);
}

std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack(); // 在调用pop前,检查栈是否为空

std::shared_ptr<T> const res(std::make_shared<T>(data.top())); // 在修改堆栈前,分配出返回值
data.pop();
return res;
}

void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack();

value=data.top();
data.pop();
}

bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};

deal-lock问题

两个或以上mutex因为互相等待而导致的 永远也没有一方能完整获得 所有mutex的 现象,我称为deal lock。

C++11开始提供了std::lock来一次性锁定两个或以上的mutex(避免死锁算法是STL内部实现的)。

在谈论std::lock之前,先谈论以下几个概念

  • BasicLockable : 支持lock unlock
  • Lockable : 支持lock unlock try_lock
  • TimedLockable : 支持lock unlock try_lock try_lock_for try_lock_until

目前,BasicLockable有四个 mutex recursive_mutex timed_mutex recursive_timed_mutex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
template< class Lockable1, class Lockable2, class... LockableN >
void lock( Lockable1& lock1, Lockable2& lock2, LockableN&... lockn );

#include <mutex>
#include <thread>
#include <iostream>
#include <vector>
#include <functional>
#include <chrono>
#include <string>

struct Employee {
Employee(std::string id) : id(id) {}
std::string id;
std::vector<std::string> lunch_partners;
std::mutex m;
std::string output() const
{
std::string ret = "Employee " + id + " has lunch partners: ";
for( const auto& partner : lunch_partners )
ret += partner + " ";
return ret;
}
};

void send_mail(Employee &, Employee &)
{
// 模拟耗时的发信操作
std::this_thread::sleep_for(std::chrono::seconds(1));
}

void assign_lunch_partner(Employee &e1, Employee &e2)
{
static std::mutex io_mutex;
{
std::lock_guard<std::mutex> lk(io_mutex);
std::cout << e1.id << " and " << e2.id << " are waiting for locks" << std::endl;
}

// 用 std::lock 获得二个锁,而不担心对 assign_lunch_partner 的其他调用会死锁我们
{
std::lock(e1.m, e2.m);
std::lock_guard<std::mutex> lk1(e1.m, std::adopt_lock);//
std::lock_guard<std::mutex> lk2(e2.m, std::adopt_lock);
// 等价代码(若需要 unique_locks ,例如对于条件变量)
// std::unique_lock<std::mutex> lk1(e1.m, std::defer_lock);
// std::unique_lock<std::mutex> lk2(e2.m, std::defer_lock);
// std::lock(lk1, lk2);
// C++17 中可用的较优解法
// std::scoped_lock lk(e1.m, e2.m);
{
std::lock_guard<std::mutex> lk(io_mutex);
std::cout << e1.id << " and " << e2.id << " got locks" << std::endl;
}
e1.lunch_partners.push_back(e2.id);
e2.lunch_partners.push_back(e1.id);
}
send_mail(e1, e2);
send_mail(e2, e1);
}

int main()
{
Employee alice("alice"), bob("bob"), christina("christina"), dave("dave");

// 在平行线程指派,因为发邮件给用户告知午餐指派,会消耗长时间
std::vector<std::thread> threads;
threads.emplace_back(assign_lunch_partner, std::ref(alice), std::ref(bob));
threads.emplace_back(assign_lunch_partner, std::ref(christina), std::ref(bob));
threads.emplace_back(assign_lunch_partner, std::ref(christina), std::ref(alice));
threads.emplace_back(assign_lunch_partner, std::ref(dave), std::ref(bob));

for (auto &thread : threads) thread.join();
std::cout << alice.output() << '\n' << bob.output() << '\n'
<< christina.output() << '\n' << dave.output() << '\n';
}

注意三种写法

写法1

1
2
3
std::lock(e1.m, e2.m); //已经把e1.m e2.m都lock住了
std::lock_guard<std::mutex> lk1(e1.m, std::adopt_lock);//所以adopt_lock就意味着不用再lock了
std::lock_guard<std::mutex> lk2(e2.m, std::adopt_lock);

之前有个疑问是,如果std::lock都已经lock了,为什么还需要lock_guard?因为std::lock,没有std::unlock,所以这里用一个RAII的lock_guard来保证释放mutex。

写法2

1
2
3
std::unique_lock<std::mutex> lk1(e1.m, std::defer_lock);
std::unique_lock<std::mutex> lk2(e2.m, std::defer_lock);
std::lock(lk1, lk2);

首先unique_lock满足BasicLocable要求,如果模板参数mutex满足Lockable,那么unique_lock满足Lockable,如果mutex满足TimedLockable,那么unique_lock满足TimedLockable。

也就是说,unique_lock取决于模板参数。

既然有了unique_lock,那么shared_lock自然也不会缺席,模板参数必须满足SharedMutex要求。

写法3

1
std::scoped_lock lk(e1.m, e2.m); //C++17开始支持

这里scoped_lock代替了lock_guard和unique_lock的RAII特性。

那么scoped_lock除了RAII特性,减少代码量,相比于std::lock还有哪些优势? ToDo

std::lock的出现将程序员从dead-lock的困扰中解脱出来,让代码变得更加简洁

避免死锁的建议

  • 避免嵌套锁
  • 避免在持有锁时调用外部代码,因为外部代码有可能做任何事情,包括获取锁。
  • 使用固定顺序获取锁(一个思路是 定义 遍历的顺序,一个线程必须先锁住A 才能获取B的锁,在锁住B之后才能获取C的锁,依次类推)
  • 使用层次锁结构

层次锁,实际上就是 使用固定顺序获取锁的一种实现方式。
有mutex A, B thread t1, t2
假如 t1要 A.lock B.lock这样的顺序 获取锁的控制权, t2如果 B.lock A.lock,这种情况下t1 t2就有可能死锁,所以我们可以规定好 必须按照 A B的顺序来获取mutex控制权。
但如果锁多了,那该怎么办呢? 层次锁就是这种思想的一个抽象概念,把锁分成不同的level,如果目前获取的是low level,那么就不可以再获取high level,当然如果你的业务代码中 先获取了B,但是也不需要获取A,那么也是满足这个原则的。

层次锁 代码实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class hierarchical_mutex
{
std::mutex internal_mutex;

unsigned long const hierarchy_value;
unsigned long previous_hierarchy_value;

static thread_local unsigned long this_thread_hierarchy_value; // 1

void check_for_hierarchy_violation()
{
if(this_thread_hierarchy_value <= hierarchy_value) // 2
{
throw std::logic_error(“mutex hierarchy violated”);
}
}

void update_hierarchy_value()
{
previous_hierarchy_value=this_thread_hierarchy_value; // 3
this_thread_hierarchy_value=hierarchy_value;
}

public:
explicit hierarchical_mutex(unsigned long value):
hierarchy_value(value),
previous_hierarchy_value(0)
{}

void lock()
{
check_for_hierarchy_violation();
internal_mutex.lock(); // 4
update_hierarchy_value(); // 5
}

void unlock()
{
if(this_thread_hierarchy_value!=hierarchy_value)
throw std::logic_error(“mutex hierarchy violated”); // 9
this_thread_hierarchy_value=previous_hierarchy_value; // 6
internal_mutex.unlock();
}

bool try_lock()
{
check_for_hierarchy_violation();
if(!internal_mutex.try_lock()) // 7
return false;
update_hierarchy_value();
return true;
}
};
thread_local unsigned long
hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX); // 8