原子操作,即不可分割开的操作;该操作一定是在同一个cpu时间片中完成,这样即使线程被切换,多个线程也不会看到同一块内存中不完整的数据。
如何获得原子操作
- gcc在4.0.1版本后就通过其内建函数支持原子操作。在这之前编程真必须要通过参考各种cpu的指令操作手册,用其汇编指令编写原子操作。而gcc通过内建函数屏蔽了这些差异。gcc支持如下原子操作:
#if (GCC_VERSION >= 40100)
/* 内存访问栅 */
#define barrier() (__sync_synchronize())
/* 原子获取 */
#define AO_GET(ptr) ({ __typeof__(*(ptr)) volatile *_val = (ptr); barrier(); (*_val); })
/*原子设置,如果原值和新值不一样则设置*/
#define AO_SET(ptr, value) ((void)__sync_lock_test_and_set((ptr), (value)))
/* 原子交换,如果被设置,则返回旧值,否则返回设置值 */
#define AO_SWAP(ptr, value) ((__typeof__(*(ptr)))__sync_lock_test_and_set((ptr), (value)))
/* 原子比较交换,如果当前值等于旧值,则新值被设置,返回旧值,否则返回新值*/
#define AO_CAS(ptr, comp, value) ((__typeof__(*(ptr)))__sync_val_compare_and_swap((ptr), (comp), (value)))
/* 原子比较交换,如果当前值等于旧指,则新值被设置,返回真值,否则返回假 */
#define AO_CASB(ptr, comp, value) (__sync_bool_compare_and_swap((ptr), (comp), (value)) != 0 ? true : false)
/* 原子清零 */
#define AO_CLEAR(ptr) ((void)__sync_lock_release((ptr)))
/* 通过值与旧值进行算术与位操作,返回新值 */
#define AO_ADD_F(ptr, value) ((__typeof__(*(ptr)))__sync_add_and_fetch((ptr), (value)))
#define AO_SUB_F(ptr, value) ((__typeof__(*(ptr)))__sync_sub_and_fetch((ptr), (value)))
#define AO_OR_F(ptr, value) ((__typeof__(*(ptr)))__sync_or_and_fetch((ptr), (value)))
#define AO_AND_F(ptr, value) ((__typeof__(*(ptr)))__sync_and_and_fetch((ptr), (value)))
#define AO_XOR_F(ptr, value) ((__typeof__(*(ptr)))__sync_xor_and_fetch((ptr), (value)))
/* 通过值与旧值进行算术与位操作,返回旧值 */
#define AO_F_ADD(ptr, value) ((__typeof__(*(ptr)))__sync_fetch_and_add((ptr), (value)))
#define AO_F_SUB(ptr, value) ((__typeof__(*(ptr)))__sync_fetch_and_sub((ptr), (value)))
#define AO_F_OR(ptr, value) ((__typeof__(*(ptr)))__sync_fetch_and_or((ptr), (value)))
#define AO_F_AND(ptr, value) ((__typeof__(*(ptr)))__sync_fetch_and_and((ptr), (value)))
#define AO_F_XOR(ptr, value) ((__typeof__(*(ptr)))__sync_fetch_and_xor((ptr), (value)))
#else
#error "can not supported atomic operation by gcc(v4.0.0+) buildin function."
#endif /* if (GCC_VERSION >= 40100) */
/* 忽略返回值,算术和位操作 */
#define AO_INC(ptr) ((void)AO_ADD_F((ptr), 1))
#define AO_DEC(ptr) ((void)AO_SUB_F((ptr), 1))
#define AO_ADD(ptr, val) ((void)AO_ADD_F((ptr), (val)))
#define AO_SUB(ptr, val) ((void)AO_SUB_F((ptr), (val)))
#define AO_OR(ptr, val) ((void)AO_OR_F((ptr), (val)))
#define AO_AND(ptr, val) ((void)AO_AND_F((ptr), (val)))
#define AO_XOR(ptr, val) ((void)AO_XOR_F((ptr), (val)))
/* 通过掩码,设置某个位为1,并返还新的值 */
#define AO_BIT_ON(ptr, mask) AO_OR_F((ptr), (mask))
/* 通过掩码,设置某个位为0,并返还新的值 */
#define AO_BIT_OFF(ptr, mask) AO_AND_F((ptr), ~(mask))
/* 通过掩码,交换某个位,1变0,0变1,并返还新的值 */
#define AO_BIT_XCHG(ptr, mask) AO_XOR_F((ptr), (mask))
- 以加法指令操作实现 x = x + n为例 ,gcc编译出来的汇编形式上如下:
...
movl 0xc(%ebp), %eax
addl $n, %eax
movl %eax, 0xc(%ebp)
...
可以看出,实现这条c语句,需要先将x所在内存0xc(%ebp)
中的值装载到寄存器%eax
中,然后用addl
指令进行与一个立即数$n
进行加操作,之后再寄存器中的结果装载回原内存中。如果在时序上又另一个线程也操作该内存中的值,且在指令addl $n, %eax
完成之后,时间片切换到了另一个线程中,该线程进行了该内存的修改操作,而且还会在后续的操作中使用,这个时候发生又发生时间片切换,切回到原线程中,进行movl %eax, 0xc(%ebp)
指令覆盖了前一个线程修改内容,如果在这时再切换到另一个线程中,该线程就会使用到一个错误的值进行后续的操作。
- gcc的原子操作是内建函数通过汇编实现的,统一命名以
__sync_xxx()
起头,原子操作做了什么事情呢?原子操作的原理都是通过汇编指令lock
在各种xadd
、cmpxchg
或xchg
指令前进行锁定操作内存的总线,并将上述的普通3条指令的操作合并为一条操作,因为内存与cpu都是通过总线进行数据交换,所以即使其它cpu核也同时(真正意义上的多线程,而不是单核上的时间片切换)要对该内存的存取,也要等待。(因为我不是低层开发人员,所以具体时序和动作我不是太了解,只能以应用层的锁动作理解这里的总线锁,如果你了解,请更正),而被锁总线的单核应该不会进行时间片切换,直到该指令完成。 - 除了多线程操作同一个内存时会发生数据的一致性错误,因为编译器的优化问题也会造成数据一致性问题。如果你的原意要进行如下的操作:
int a = 0;
int b = 0;
void A() {
a = 1;
b = 2;
}
void B() {
if (b > 0)
printf("a :%d\n", a);
}
那么经过编译器的优化,A()中的两条复制语句可能被调换顺序,如果两个线程分别同时执行A()和B(),那么因为这个原因,B()可能输出1,也可能输出0;解决方法是让a = 1
一定在b = 2
执行,那么在两者之间插入内存栅栏__sync_synchronize()
可以保证先后次序。(因为我对这样的优化发生情况不是很明了,故这里不能详细的描述这样的优化对同线程产生的影响)
- 原子操作的内存,要保证其内容已定是存取最新的,而不是cache中的数据,所以要用
volatile
关键字表明,这样每次存取cpu直接存取内存,而非cache中的数据,我们定义一个原子类型:
#ifndef AO_T
typedef volatile long AO_T;
#endif
原子操作与普通C语句的等效操作
这里用上面定义的宏说明原子操作,等效的C语言非原子的操作为了保证一致性,我们使用
lock()
和unlock
这个伪语句表示锁的加锁和解锁。当然原子操作要比应用层加锁快了太多太多。
- 内存栅栏使用
int a = 0;
barrier();
int b = 2;
保证a的复制在b的复制前执行
- 原子获取
int a = 5;
int b = AO_GET(&a); //b==5;
int a = 5;
lock();
int b = a;
unlock();
保证读取a的值是内存中的值,而不是寄存器或cache中的值
- 原子设置
int a = 0;
AO_SET(&a, 10); //a==10;
int a = 0;
lock();
a = 10;
unlock();
- 原子交换
int a = 10;
AO_SWAP(&a, 9);
int a = 10;
lock();
if (a != 9)
a = 9;
unlock();
- 原子比较交换
int a = 10;
int b = AO_CAS(&a, 10, 9); //b==10, a==9;
int c = AO_CAS(&a, 9, 8); //c==8, a==10;
int a = 10;
int b = 0;
int c = 0;
lock();
if (a == 10) {
b = a;
a = 9;
} else {
b = 10;
}
unlock();
lock();
if (a == 9) {
b = a;
a = 8;
} else {
b = 9;
}
unlock();
AO_CASB()的逻辑与AO_CAS()一致,只是返还一个真假值判断是否发生了交换,就不再赘诉了。
- 原子清零
int a = 10;
AO_CLEAR(&a); //a==0;
int a = 10;
lock();
a = 0;
unlock();
- 先操作后使用的加减运算和逻辑运算
- 先加一个数,再使用和值
AO_xxx_F()
中的F
表示fetch
提取的意思
int a = 1; int b = AO_ADD_F(&a, 10);//a==11, b==11
int a = 1; int b = 0; lock(); a += 10; b = a; unlock();
- 其它的运算(减,或,与,异或)与加法操作逻辑一样,就不再赘诉了
- 先加一个数,再使用和值
- 先使用后操作的加减运算与逻辑运算
- 使用原值,后加上一个数
int a = 1; int b = AO_F_ADD(&a, 10);//a==11, b==1
int a = 1; int b = 0; lock(); b = a; a += 10; unlock();
何时使用原子操作最合适
原子操作最合适用来管理状态,而且最好是程序发现状态不符合自己要求是,可以忽略这个错误,继续运行,或稍后在此尝试。比如我们使用一个local static
变量存储当前系统有多少个cpu核,以备给出一些策略,比如以后我们要实现的自旋锁中的休眠。代码如下:
long GetCPUCores()
{
static long g_CPUCores = 0;
long gcpus = -1;
/*原子获取,如果没有设置过,则继续,否则返回这个值*/
if (likely((gcpus = AO_GET(&g_CPUCores)) != -1)) {
return gcpus;
}
gcpus = sysconf(_SC_NPROCESSORS_CONF);
if (unlikely(gcpus < 0)) {
printf("Get number of CPU failure : %s", strerror(errno));
abort();
}
/*原子设置*/
AO_SET(&g_CPUCores, gcpus);
return gcpus;
}
如果有多个线程同时调用,或单个线程多次调用,我们都可以保证g_CPUCores
中数据的有效性,不会出现获取到一个大于0到假值导致后续的逻辑错误。而且这样的设计,还可以提高效率,如果获取的系统参数是一个像
#ifdef __APPLE__
gtid = syscall(SYS_thread_selfid);
#else
gtid = syscall(SYS_gettid);
#endif
的真正的系统调用,那么在结果固定的情况下,代价是昂贵的,因为程序必须要发起中断服务,切换到内核空间调用代码为SYS_thread_selfid
或SYS_gettid
的中段服务,从而得到线程ID(线程是一个轻量级的进程,只不过它的堆空间与其它线程共享,而不是进程那样是彼此独立的,我以后会在此细谈这个ID值的运用)。
改进上一篇文章中提及的结构魔数操作
上一节我们说过,使用带魔数字段结构的函数通过判断、修改魔数做出相应的操作,试想如果两个线程同时操作魔数字段,肯定会带来冲突,所以我们将其对应的非原子操作,改为原子操作,代码如下:
/*
* 魔数
* 结构体中设置一个magic的成员变量,已检查结构体是否被正确初始化
*/
#if !defined(OBJMAGIC)
#define OBJMAGIC (0xfedcba98)
#endif
/*原子的设置魔数*/
#undef REFOBJ
#define REFOBJ(obj) \
({ \
int _old = 0; \
bool _ret = false; \
if (likely((obj))) { \
_old = AO_SWAP(&(obj)->magic, OBJMAGIC); \
} \
_ret = (_old == OBJMAGIC ? false : true); \
_ret; \
})
/*原子的重置魔数*/
#undef UNREFOBJ
#define UNREFOBJ(obj) \
({ \
bool _ret = false; \
if (likely((obj))) { \
_ret = AO_CASB(&(obj)->magic, OBJMAGIC, 0); \
} \
_ret; \
})
/*原子的验证魔数*/
#undef ISOBJ
#define ISOBJ(obj) ((obj) && AO_GET(&(obj)->magic) == OBJMAGIC)
/*断言魔数*/
#undef ASSERTOBJ
#define ASSERTOBJ(obj) (assert(ISOBJ((obj))))
其实这样的运用也不能100%的保证多线程下数据的一致性,比如两个线程A和B,同时在操作一个结构体T:
- 初始化操作,initT():
void initT(T *t) {
REFOBJ(t);
//other initial operate
}
-
处理数据操作,dealT():
void dealT(T *t) { if(!ISOBJ(t)) return; //other deal operate }
-
销毁数据,destroyT():
void destroy(T *t) { if(!UNREF(t)) return; //other destroy }
考虑下列时序:
- A刚完成
initT ()
中的REFOBJ ()
语句,将要真正的初始化T的其它的字段,这时切换到B; - B也调用
initT ()
中的REFOBJ ()
语句,发现结构体的初始化标志已经完成了,就在处理其它的字段了,结果当然是全处理的是脏数据。
再考虑下列时序:
- A和B都使用T交错的操作了T一段时间,A和B都想销毁T持有的数据而调用
destroy()
; - 假设A先进入
destroy()
,然后在UNREF ()
调用之前切换到B; - B进入
destroy()
, 并成功的调用UNREF ()
; - 在后续的操作中,不论何时发生切换都不会造成数据重复销毁。
上面情况出现的根本原因就是原子操作不原子,因为你企图使用这样的原子操作,进行非原子的多步骤的字段初始化操作,这是不会成功的。所以在你使用原子操作时,一定要考虑线程切换带来的时序问题和你的原子操作能不能使你的操作原子的进行。
下一步我们做什么
我们将使用原子操作实现一个原子锁,并说明什么情况下应该使用原子锁,什么情况下不应该使用原子锁。敬请期待哦。