原子类型和原子操作
并行编程、多线程与C++11
常见的并行编程有多种模型:共享内存、多线程、消息传递等。
多线程模型允许同一时间多个处理器单元执行统一进程中的代码部分,而通过分离的栈空间和共享的数据区及堆栈空间,现场可以拥有独立的执行状态以及进行快速的数据共享。
- POSIX pthread
- OpenMP
原子操作与C++11原子类型
#include <iostream>
#include <thread>
#include <atomic>
std::atomic_llong total {0};
void func(int) {
for (long long i = 0; i < 100000000LL; ++i) {
total += i;
}
}
int main(int argc, char **argv)
{
std::thread t1(func, 0);
std::thread t2(func, 0);
t1.join();
t2.join();
std::cout << total << std::endl;
return 0;
}
/// 9999999900000000
- std::atomic<T> t;
声明一个类型为 T 的原子类型变量 t 。编译器会保证产生并行情况下行为良好的代码,以避免线程间对数据 t 的竞争。
对于线程而言,原子类型通常属于“资源型”数据,这意味着多个线程通常只能访问单个原子类型的拷贝。因此,在C++11中,原子类型只能从其模板参数类型中进行构造,标准不允许原子类型进行拷贝、移动构造,以及使用 operator= 等,以防止发生意外。
atomic<float> af {1.2f};
atomic<float> af1 {af}; // error.
事实上,atomic 模板类的拷贝、移动、operator= 等总是默认被删除的。
不过从 atomic<T> 类型变量来构造其模板参数类型 T 的变量则是可以的,比如:
atomic<float> af {1.2f};
float f = af;
float f1 { af };
// 编译器会隐式地完成原子类型到其对应的类型的转换。
操作 | atomic_flag | atmic_bool | atomic-integeral-type | atomic<bool> | atomic<T*> | atomic<intergral-type> | Atomic<class-type> |
---|---|---|---|---|---|---|---|
test_and_set | Y | ||||||
clear | Y | ||||||
is_lock_free | Y | Y | Y | Y | Y | Y | |
load | Y | Y | Y | Y | Y | Y | |
store | Y | Y | Y | Y | Y | Y | |
exchange | Y | Y | Y | Y | Y | Y | |
compare_exchange_weak + strong | Y | Y | Y | Y | Y | Y | |
fetch_add, += | Y | Y | Y | ||||
fetch_sub, -= | Y | Y | Y | ||||
fetch_or,|= | Y | Y | |||||
fetch_and, &= | Y | Y | |||||
fetch_xor, ^= | Y | Y | |||||
++, -- | Y | Y | Y | Y |
- class type 是自定义类型
std::atomic<int> a;
int b = a; // 等价于: int b = a.load() ; a.load() 是原子的。
exchange 和 compare_exchange_weak/compare_exchange_strong 与平台密切相关。
std::atomic_flag
#include <iostream>
#include <atomic>
#include <thread>
#include <unistd.h>
std::atomic_flag lock = ATOMIC_FLAG_INIT;
/*
* lock 为 flase
*/
void f(int n)
{
// 通过 test_and_set 来设置 lock 为 true .返回旧的值,即 为 true.
// 即 自旋等待
while (lock.test_and_set(std::memory_order_acquire)) // 尝试获得锁
std::cout << "Waiting from thread " << n << std::endl;
std::cout << "Thread " << n << " starts working" << std::endl;
}
void g(int n)
{
std::cout << "Thread " << n << " is going to start." << std::endl;
lock.clear(); // 设置 lock 为 false
std::cout << "Thread " << n << " starts workding." << std::endl;
}
int main()
{
lock.test_and_set();
std::thread t1(f, 1);
std::thread t2(g, 2);
t1.join();
usleep(100);
t2.join();
/*
* 通过自旋锁达到了让 t1 线程 等待 t2 线程的效果。
*/
return 0;
}
void Lock(std::atomic_flag *lock) {
while (lock.test_and_set());
}
void Unlock(std::atomic_flag *lock) {
lock.clear();
}
内存模型,顺序一致性与 memory_order
- 顺序一致性(sequential consistent)的内存模型(memory model)
先看一段代码:
#include <atomic>
#include <thread>
#include <iostream>
std::atomic<int> a {0};
std::atomic<int> b {0};
int ValueSet(int) {
int t = 1;
a = t;
b = 2;
}
int Observer(int) {
std::cout << "(" << a << ", " << b << ")" << std::endl;
}
int main()
{
std::thread t1(ValueSet, 0);
std::thread t2(Observer, 0);
t1.join();
t2.join();
std::cout << "Got (" << a << ", " << b << ")" << std::endl; // Got (1, 2)
return 0;
}
/*
* Observer 可能出现的结果: (0, 0);(1, 2);(1, 0)
*/
思考: Observer 打印出 (0, 2) 这个结果合理吗?
- 如果认为程序是顺序是顺序执行的,那么这个结果必然是不合理的
- 但是程序一定是顺序执行吗?编译器不会对程序进行优化吗?
如果编译器认为 a , b 的赋值语句的执行的先后顺序对输出结果没有任何影响的话,则可以依情况将指令重排序(reorder)以提高性能。
如果我们假定,所有的原子类型的执行顺序都无关紧要,那么在多线程情况下就可能发生严重的错误:
#include <thread>
#include <atomic>
#include <iostream>
std::atomic<int> a;
std::atomic<int> b;
int Thread1(int) {
int t = 1;
a = t;
b = 2;
}
int Thread2(int) {
while (b != 2)
; // 自旋等待
std::cout << a << std::endl;
}
int main()
{
std::thread t1(Thread1, 0);
std::thread t2(Thread2, 0);
t1.join();
t2.join();
return 0;
}
假设,Thread1 中 a 的赋值语句的执行被排序到 b 的赋值语句之后,那么 Thread2 打印出的 a 的值可能为 0。
- 在 C++11 中的原子类型的变量在线程中总是保持着顺序执行的特性。称为“顺序一致性”。
那是不是说要所有的代码都要保持顺序一致性就好呢?答案是否定的,这样会影响性能。
- “内存模型”
内存模型通常是一个硬件上的概念,表示的是机器指令是以什么样的顺序被处理器执行。现代的处理器不是逐条处理,顺序执行机器指令的。
1: Loadi reg3, 1; # 将 立即数 1 放入到 reg3
2: Move reg4, reg3; # 将 reg3 的数据放入到 reg4
3: Store reg4, a; # 将寄存器 reg4 中的数据存入到内存地址 a
4: Loadi reg5, 2; # 将立即数2放到寄存器 reg5
5: Store reg5, b; # 将寄存器reg5的数据存入到内存地址b
上述汇编演示了
t = 1;
a = t;
b = 2;
如果处理器顺序执行所有指令的话,那么称为 内存模型为强顺序(Strong ordered)。
但是,指令1,2,3 和指令 4,5 在允许顺序上毫无影响。一些处理器就可能按照 1->4->2->5->3这样顺序执行。如果指令按这个顺序被处理器执行的话,称为弱顺序(weaked ordered)。
- 多线程情况的顺序一致性
强顺序:多线程总是共享代码的,那么强顺序意味着:对于多个线程而言,其看到的指令顺序是一致的。具体地,对于共享内存的处理器而言,需要看到内存中的数据被改变的顺序与机器指令中的一致。
例如:对于弱顺序内存模型,线程A看到的执行顺序是3,5. 线程B可能看到的是 5, 3.
现实:
强顺序:x86 以及 SPARC
弱顺序:Alpha、PowerPC、Itanlium、ARM v7
对于弱顺序的架构,如果要保证顺序执行,需要再汇编指令中加入一条所谓的内存栅栏(memory barrier)指令。
例如:在 PowerPC 中一条栅栏指令 sync。sync 之前的指令总是先于 sync 之后的指令。
1: Loadi reg3, 1; # 将 立即数 1 放入到 reg3
2: Move reg4, reg3; # 将 reg3 的数据放入到 reg4
3: Store reg4, a; # 将寄存器 reg4 中的数据存入到内存地址 a
4. Sync
5: Loadi reg5, 2; # 将立即数2放到寄存器 reg5
6: Store reg5, b; # 将寄存器reg5的数据存入到内存地址b
sync 指令对高度流水化的 PowerPC 处理器性能影响很大,因此,如果可以不顺序梯教育局的执行结果的话,可以保证弱顺序内存模型的处理器保持较高的流水线吞吐率(throughput)和运行时性能。
- 为什么有弱顺序的内存模型?
弱顺序的内存模型可以使得处理器进一步发掘指令张的并行性,是的执行执行的性能更高。
C++11 中定义的内存模型和顺序一致性跟硬件的内存模型的强顺序、弱顺序之间的关系?
编译器出于代码优化的考虑,会将指令前后移动,已获得最佳的机器指令的排列及产生最佳的运行时性能。对于C++11中的内存模型而言,要保证代码的顺序一致性,就必须同时做到以下几点:
- 编译器保证原子操作的指令间顺序不变,即保证产生的读写原子类型的变量的机器指令与代码编写者看到的一致。
- 处理器对原子操作的汇编指令的执行顺序不变。
在 C++11 中,原子类型成员函数总是保证了顺序一致性。对于 x86 平台,禁止了编译器对原子类型变量间的重排序优化;而对于 PowerPC 这样的平台来说,不仅禁止了编译器的优化,还插入了大量的内存栅栏。这对于意图提高性能的多线程而言,无疑是一种伤害。
在 C++11 中,设计者给出了解决方式就是让程序员为原子操作指定所谓的内存顺序:memory_order.
- memory_order_relaxed
#include <thread>
#include <atomic>
#include <iostream>
std::atomic<int> a;
std::atomic<int> b;
int Thread1(int) {
int t = 1;
a.store(t, std::memory_order_relaxed);
b.store(2, std::memory_order_relaxed);
}
int Thread2(int) {
std::cout << "(" << a << ", " << b << ")" << std::endl;
}
int main()
{
std::thread t1(Thread1, 0);
std::thread t2(Thread2, 0);
t1.join();
t2.join();
return 0;
}
memory_order_relaxed 可以任由编译器重排序或者由处理器乱序执行。
枚举值 | 定义规则 |
---|---|
memory_order_relaxed | 不对执行顺序做任何保证 |
memory_order_acquire | 本线程中,所有后续的读操作必须在本条原子操作完成后执行 |
memory_order_release | 本线程中,所有之前的写操作完成后才能执行本条原子操作 |
memory_order_acq_rel | 同时包含 memory_order_acquire 和 memory_order_release 标记 |
memory_order_consume | 本线程中,所有后续的有关原子操作,必须再本条原子操作完成后执行 |
memory_order_seq_cst | 全部存取都顺序执行 |
memory_order_seq_cst 表示该原子操作必须是顺序一致的,这是 C++ 11 中所有 atomic 原子操作的默认值。
通常情况下,我们可以把 atomic 成员函数可使用 memory_order 值分为以下3组:
- 原子存储操作(store) 可以使用: memory_order_relaxed, memory_order_release, memory_order_seq_cst
- 原子读取操作(load) 可以使用: memory_order_relaxed, memory_order_consume, memory_order_acquire, memory_order_seq_cst
- RMW 操作(read-modify-write), 即一些需要同时读写的操作,比如 atomic_flag 的 test_and_set() 。可以使用: memory_order_relaxed, memory_order_consume, memory_order_acquire, memory_order_release, memory_order_acq_rel, memory_order_seq_cst。
"operator=", "operator+=" 函数都是 memory_order_seq_cst 作为 memory_order 参数的原子操作的简单封装。
#include <thread>
#include <atomic>
#include <iostream>
std::atomic<int> a;
std::atomic<int> b;
int Thread1(int) {
int t = 1;
a.store(t, std::memory_order_relaxed);
b.store(2, std::memory_order_relaxed);
}
int Thread2(int) {
while (b.load(std::memory_order_relaxed) != 2); // 自旋等待
std::cout << a.load(std::memory_order_relaxed) << std::endl;
}
int main()
{
std::thread t1(Thread1, 0);
std::thread t2(Thread2, 0);
t1.join();
t2.join();
return 0;
}
上述示例 a 的值可能是0,也可能是1.
如何做到“既快又对”?
我们所需的只是 a.store() 先于 b.store(), b.load() 先于 a.load() 。
#include <thread>
#include <atomic>
#include <iostream>
std::atomic<int> a;
std::atomic<int> b;
int Thread1(int) {
int t = 1;
a.store(t, std::memory_order_relaxed);
b.store(2, std::memory_order_release); // 该原子操作前所有的写原子操作必须完成
}
int Thread2(int) {
while (b.load(std::memory_order_acquire) != 2); // 该原子操作必须完成才能执行之后所有
std::cout << a.load(std::memory_order_relaxed) << std::endl;
}
int main()
{
std::thread t1(Thread1, 0);
std::thread t2(Thread2, 0);
t1.join();
t2.join();
return 0;
}
- memory_order_release 和 memory_order_acquire 常常结合使用,我们称这种内存顺序为 release-acquire 内存顺序.
- 通常,“先于发生”关系总是传递的,比如原子操作A发生于原子操作B之前,而原子操作B又发生于原子操作C之前的话,则A一定发生于C之前。
#include <thread>
#include <atomic>
#include <iostream>
#include <cassert>
std::atomic<std::string *> ptr;
std::atomic<int> data;
void Producer()
{
std::string *p = new std::string("Hello");
data.store(42, std::memory_order_relaxed);
ptr.store(p, std::memory_order_release);
}
void Consumer()
{
std::string *p;
while (!(p = ptr.load(std::memory_order_consume)))
;
assert(*p == "Hello");
assert(data.load(std::memory_order_relaxed) == 42); //
}
int main()
{
std::thread t1(Producer);
std::thread t2(Consumer);
t1.join();
t2.join();
return 0;
}
保证了 ptr.load(std::memory_order_consume) 必须发生在 *ptr (实际上是 ptr.load) 这样的解引用操作之前。
std::memory_order_consume 只是保证原子操作发生在与 ptr 有关原子操作之前。
- memory_order_release 和 memory_order_consume 的配合建立“生产-消费”的同步顺序.