由于代码量较多,先贴出实现,在后几篇文章我会详细讲解。
实现
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <stdbool.h>
#include <assert.h>
#include <signal.h>
#include <pthread.h>
#include <errno.h>
#define DEBUG
/*
* 编译器版本
*/
/* gcc version. for example : v4.1.2 is 40102, v3.4.6 is 30406 */
#define GCC_VERSION (__GNUC__ * 10000 + __GNUC_MINOR__ * 100 + __GNUC_PATCHLEVEL__)
/*
*逻辑跳转优化
*/
#if GCC_VERSION
/*条件大多数为真,与if配合使用,直接执行if中语句*/
#define likely(x) __builtin_expect(!!(x), 1)
/*条件大多数为假,与if配合使用,直接执行else中语句*/
#define unlikely(x) __builtin_expect(!!(x), 0)
#else
#define likely(x) (!!(x))
#define unlikely(x) (!!(x))
#endif
/*
* intel x86 平台
*/
#if (__i386__ || __i386 || __amd64__ || __amd64)
#ifndef __X86__
#define __X86__
#endif
#endif
#ifndef _cpu_pause
#if defined(__X86__) || defined(__GNUC__)
#define _cpu_pause() __asm__("pause")
#else
#define _cpu_pause() ((void)0)
#endif
#endif
#if (GCC_VERSION >= 40100)
/* 内存访问栅 */
#define barrier() (__sync_synchronize())
/* 原子获取 */
#define AO_GET(ptr) ({ __typeof__(*(ptr)) volatile *_val = (ptr); barrier(); (*_val); })
/* 原子设置 */
#define AO_SET(ptr, value) ((void)__sync_lock_test_and_set((ptr), (value)))
/* 原子交换,如果被设置,则返回旧值,否则返回设置值 */
#define AO_SWAP(ptr, value) ((__typeof__(*(ptr)))__sync_lock_test_and_set((ptr), (value)))
/* 原子比较交换,如果当前值等于旧值,则新值被设置,返回旧值,否则返回新值*/
#define AO_CAS(ptr, comp, value) ((__typeof__(*(ptr)))__sync_val_compare_and_swap((ptr), (comp), (value)))
/* 原子比较交换,如果当前值等于旧指,则新值被设置,返回真值,否则返回假 */
#define AO_CASB(ptr, comp, value) (__sync_bool_compare_and_swap((ptr), (comp), (value)) != 0 ? true : false)
/* 原子清零 */
#define AO_CLEAR(ptr) ((void)__sync_lock_release((ptr)))
/* 通过值与旧值进行算术与位操作,返回新值 */
#define AO_ADD_F(ptr, value) ((__typeof__(*(ptr)))__sync_add_and_fetch((ptr), (value)))
#define AO_SUB_F(ptr, value) ((__typeof__(*(ptr)))__sync_sub_and_fetch((ptr), (value)))
#define AO_OR_F(ptr, value) ((__typeof__(*(ptr)))__sync_or_and_fetch((ptr), (value)))
#define AO_AND_F(ptr, value) ((__typeof__(*(ptr)))__sync_and_and_fetch((ptr), (value)))
#define AO_XOR_F(ptr, value) ((__typeof__(*(ptr)))__sync_xor_and_fetch((ptr), (value)))
/* 通过值与旧值进行算术与位操作,返回旧值 */
#define AO_F_ADD(ptr, value) ((__typeof__(*(ptr)))__sync_fetch_and_add((ptr), (value)))
#define AO_F_SUB(ptr, value) ((__typeof__(*(ptr)))__sync_fetch_and_sub((ptr), (value)))
#define AO_F_OR(ptr, value) ((__typeof__(*(ptr)))__sync_fetch_and_or((ptr), (value)))
#define AO_F_AND(ptr, value) ((__typeof__(*(ptr)))__sync_fetch_and_and((ptr), (value)))
#define AO_F_XOR(ptr, value) ((__typeof__(*(ptr)))__sync_fetch_and_xor((ptr), (value)))
#else
#error "can not supported atomic operation by gcc(v4.0.0+) buildin function."
#endif /* if (GCC_VERSION >= 40100) */
/* ------------------- */
/*
* 原子自旋锁
*/
typedef struct
{
volatile uint64_t shared : 1;
volatile uint64_t magic : 7;
volatile uint64_t pid : 18;
volatile uint64_t value : 38;
} AO_SpinlockT;
#define AO_LOCK_INLOCK (1)
#define AO_LOCK_UNLOCK (0)
#define AO_LOCK_MAGIC (119)
/*
* 静态初始化对象宏
*/
#define NULLOBJ_AO_SPINLOCK { 0, AO_LOCK_MAGIC, 0, AO_LOCK_UNLOCK }
void(AO_SpinlockInit)(AO_SpinlockT * lock, bool shared);
bool(AO_SpinTrylock)(AO_SpinlockT * lock, long val);
void(AO_SpinLock)(AO_SpinlockT * lock, long val);
void(AO_SpinUnlock)(AO_SpinlockT * lock, long val);
void(AO_SpinlockInit)(AO_SpinlockT * lock, bool shared)
{
assert(lock);
if (unlikely(lock->shared == 1))
if (likely(lock->shared))
return;
uint64_t old;
old = AO_GET((uint64_t *)lock);
AO_SpinlockT value;
value.shared = shared ? 1 : 0;
value.pid = 0;
value.magic = AO_LOCK_MAGIC;
value.value = AO_LOCK_UNLOCK;
AO_CAS((uint64_t *)lock, old, *(uint64_t *)&value);
}
#define AssertLock(lock) \
do { if (unlikely(!(lock) || (lock)->magic != AO_LOCK_MAGIC)) abort(); \
} while (0)
static __thread unsigned long g_AOLockChecker = 1;
static inline bool _AO_lockop(AO_SpinlockT *lock, long ov, long nv)
{
AO_SpinlockT old;
*(uint64_t *) &old = AO_GET((uint64_t *)lock);
old.value = ov;
AO_SpinlockT value;
value.shared = old.shared;
value.magic = old.magic;
value.pid = getpid();
value.value = nv;
bool flag = AO_CASB((uint64_t *)lock, *(uint64_t *)&old, *(uint64_t *)&value);
if (likely(flag)) {
g_AOLockChecker = 1;
}
return flag;
}
#ifndef _AO_SPIN_CHECKBOLT
#define _AO_SPIN_CHECKBOLT (2048)
#endif
static bool _AO_check_repair(AO_SpinlockT *lock, long val)
{
if (unlikely(!lock->shared)) return false;
unsigned long hit = AO_F_ADD(&g_AOLockChecker, 1);
if (likely((hit & (_AO_SPIN_CHECKBOLT - 1)) != 0)) {
sched_yield();
return false;
}
/*
* 每隔一定的时间检查一次
*/
AO_SpinlockT old;
uint64_t oldvalue = 0;
uint32_t pid = 0;
*(uint64_t *) &old = AO_GET((uint64_t *)lock);
oldvalue = old.value;
pid = old.pid;
#ifdef DEBUG
fprintf(stderr, ">>> WAITER `%d` : pid : `%u`, value : `%llu` CHECK SPIN LOCK <<<\n" , getpid(), pid, oldvalue);
#endif
if (unlikely(pid == 0))
return false;
/*判定锁持有进程是否存活*/
int flag = 0;
flag = kill(pid, 0);
if (likely(flag > 0 || errno == EPERM)) {
sched_yield();
return false;
}
AO_SpinlockT value;
value.shared = old.shared;
value.magic = old.magic;
value.pid = getpid();
value.value = val;
/*
* 持有进程已关闭
* 此时lock->pid被设置成有效的pid,以保证只有一个检查进程成功重置死锁
*/
bool rc = 0;
rc = AO_CASB((uint64_t *)lock, *(uint64_t *)&old, *(uint64_t *)&value);
if (unlikely(!rc)) {
sched_yield();
return false;
}
#ifdef DEBUG
fprintf(stderr, ">>> WAITER `%d` REPAIR LOCK <<<\n", getpid());
#endif
g_AOLockChecker = 1;
return true;
}
bool(AO_SpinTrylock)(AO_SpinlockT * lock, long val)
{
bool flag = false;
AssertLock(lock);
flag = _AO_lockop(lock, AO_LOCK_UNLOCK, val);
if (unlikely(!flag))
flag = _AO_check_repair(lock, val);
return flag;
}
#ifndef _AO_SPIN_SPINMAX
#define _AO_SPIN_SPINMAX (256)
#endif
void(AO_SpinLock)(AO_SpinlockT * lock, long val)
{
AssertLock(lock);
do {
if (likely(_AO_lockop(lock, AO_LOCK_UNLOCK, val))) {
return;
}
#ifdef _AO_SPIN_SPINMAX
int i = 0;
for (i = 0; i < _AO_SPIN_SPINMAX; i++) {
int j = 0;
for (j = 0; j < i; j += 1) {
/*原子锁忙等待算法*/
#if 0
_cpu_pause();
#else
/*equivalent of j % 8 */
if (likely(j & 0x111)) {
_cpu_pause();
} else {
sched_yield();
}
#endif
}
if (likely(_AO_lockop(lock, AO_LOCK_UNLOCK, val))) {
return;
}
}
#endif /* ifdef _AO_SPIN_SPINMAX */
} while (likely(!_AO_check_repair(lock, val)));
}
void(AO_SpinUnlock)(AO_SpinlockT * lock, long val)
{
bool flag = false;
AssertLock(lock);
flag = _AO_lockop(lock, val, AO_LOCK_UNLOCK);
if (unlikely(!flag)) {
#ifdef DEBUG
fprintf(stderr, "you must lock this locker befor unlock:%llu.\n", *(uint64_t *)lock);
#endif
abort();
}
}
#define AO_SpinlockInit(l, s) \
((AO_SpinlockInit)((l), (s)))
#define AO_SpinTrylock(l) \
((AO_SpinTrylock)((l), (long)AO_LOCK_INLOCK))
#define AO_SpinLock(l) \
((AO_SpinLock)((l), (long)AO_LOCK_INLOCK))
#define AO_SpinUnlock(l) \
((AO_SpinUnlock)((l), (long)AO_LOCK_INLOCK))
测试用例
/*test*/
#define MMAP_INTI_TRYS 1000
struct counter
{
/* data */
int counter;
int magic;
long exppid;
AO_SpinlockT locker;
};
#include <fcntl.h>
#include <stdint.h>
#include <string.h>
#include <sys/mman.h>
#define TEST_MAGIC (0x12345678)
#define TEST_WAKESIG_CHILD (SIGUSR1)
#define TEST_WAKESIG_FATHER (SIGUSR2)
void sighandler(int signo)
{
printf("%d interrupte by %d\n", getpid(), signo);
}
void initial(struct counter **ptr, const char *path)
{
int fd = -1;
bool iscreate = false;
assert(ptr);
assert(path);
*ptr = NULL;
fd = open(path, O_CREAT | O_EXCL | O_RDWR, 0666);
if (likely(fd < 0)) {
if (unlikely(errno != EEXIST)) {
fprintf(stderr, "open %s fail : %s\n", path, strerror(errno));
exit(EXIT_FAILURE);
}
fd = open(path, O_RDWR, 0666);
if (unlikely(fd < 0)) {
fprintf(stderr, "open %s fail : %s\n", path, strerror(errno));
exit(EXIT_FAILURE);
}
} else {
iscreate = true;
}
if (iscreate) {
ftruncate(fd, sizeof(struct counter));
}
*ptr = (struct counter *)mmap(NULL, sizeof(struct counter), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (unlikely(ptr == MAP_FAILED)) {
fprintf(stderr, "mmap fail : %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
if (iscreate) {
AO_SpinlockInit(&((*ptr)->locker), true);
(*ptr)->magic = 0;
}
(*ptr)->counter = 0;
(*ptr)->exppid = 0;
sigset_t sigset;
sigfillset(&sigset);
sigprocmask(SIG_BLOCK, &sigset, NULL);
signal(TEST_WAKESIG_CHILD, sighandler);
signal(TEST_WAKESIG_FATHER, sighandler);
signal(SIGCHLD, sighandler);
}
void waitfather(struct counter *ptr)
{
assert(ptr);
printf("child start : %d\n", getpid());
kill(getppid(), TEST_WAKESIG_FATHER);
sigset_t sigset;
sigfillset(&sigset);
sigdelset(&sigset, TEST_WAKESIG_CHILD);
sigdelset(&sigset, SIGINT);
while (ptr->magic != TEST_MAGIC) {
printf("child wait father : %d, %d\n", getpid(), getppid());
sigsuspend(&sigset);
}
}
void waitchild(struct counter *ptr)
{
assert(ptr);
sigset_t sigset;
sigfillset(&sigset);
sigdelset(&sigset, TEST_WAKESIG_FATHER);
sigdelset(&sigset, SIGINT);
while (1) {
printf("father wait child: %d\n", getpid());
sigsuspend(&sigset);
break;
}
ptr->magic = TEST_MAGIC;
/*wake up all child*/
kill(0, TEST_WAKESIG_CHILD);
}
void worker(struct counter *ptr, int loops)
{
waitfather(ptr);
printf("child start work : %d\n", getpid());
while (loops-- > 0) {
AO_SpinLock(&ptr->locker);
ptr->counter++;
printf("child %d : %d\n", getpid(), ptr->counter);
usleep(5 * 1000);
#ifdef TEST_REPAIR
if (loops > 0 && (loops & 31) == 0 && ptr->exppid == getpid()) {
printf("child %d occured exception and exit.\n", getpid());
exit(EXIT_FAILURE);
}
#endif
AO_SpinUnlock(&ptr->locker);
sched_yield();//让进程让出执行权,好让其它进程运行
}
AO_SpinLock(&ptr->locker);
printf("child %d finish\n", getpid());
AO_SpinUnlock(&ptr->locker);
exit(EXIT_SUCCESS);
}
int main(int argc, char const *argv[])
{
int fd = -1;
int i = 0;
int loop = 0;
int performers = 0;
struct counter *ptr = NULL;
if (unlikely(argc != 4)) {
fprintf(stderr, "Usage %s <path> <#performers> <#loop>\n", argv[0]);
return EXIT_FAILURE;
}
performers = atoi(argv[2]);
loop = atoi(argv[3]);
if (unlikely(!(performers > 0 && loop > 0))) {
fprintf(stderr, "Usage %s <path> <#performers> <#loop>\n", argv[0]);
return EXIT_FAILURE;
}
initial(&ptr, argv[1]);
pid_t child;
int childs = performers;
while (childs-- > 0) {
child = fork();
switch (child) {
case -1 : {
fprintf(stderr, "fork fail : %s\n", strerror(errno));
goto end;
}
case 0 : {
worker(ptr, loop);
}
default : {
break;
}
}
}
#ifdef TEST_REPAIR
ptr->exppid = child;
#endif
waitchild(ptr);
end:
printf("join child\n");
while (1) {
child = waitpid(0, NULL, WNOHANG);
if (child == -1) {
break;
}
}
printf(">>>>>\n\tTest result : %s\n>>>>>\n",
likely(ptr->counter == performers * loop) ?
"SUCCESS" : "FAILURE");
return EXIT_SUCCESS;
}
编译
1.不测试修复功能
gcc -g main.c -o main -lpthread
2.测试修复功能
gcc -g main.c -o main -lpthread -DTEST_REPAIR
测试
[zhoukai@zhoukai-MBPR:Desktop]$ time ./main /tmp/mmap.tmp 4 10
child start : 6481
child start work : 6481
child 6481 : 1
child start : 6482
child start work : 6482
child start : 6483
child start work : 6483
father wait child: 6480
6480 interrupte by 31
join child
child start : 6484
child start work : 6484
child 6482 : 2
child 6482 : 3
child 6482 : 4
child 6482 : 5
child 6482 : 6
child 6482 : 7
child 6484 : 8
child 6484 : 9
child 6484 : 10
child 6484 : 11
child 6482 : 12
child 6484 : 13
child 6484 : 14
child 6484 : 15
child 6484 : 16
child 6484 : 17
child 6481 : 18
child 6481 : 19
child 6481 : 20
child 6481 : 21
child 6481 : 22
child 6481 : 23
child 6481 : 24
child 6481 : 25
child 6481 : 26
child 6481 finish
child 6482 : 27
child 6482 : 28
child 6482 : 29
child 6482 finish
child 6484 : 30
child 6484 finish
child 6483 : 31
child 6483 : 32
child 6483 : 33
child 6483 : 34
child 6483 : 35
child 6483 : 36
child 6483 : 37
child 6483 : 38
child 6483 : 39
child 6483 : 40
child 6483 finish
>>>>>
Test result : SUCCESS
>>>>>
real 0m0.138s
user 0m0.057s
sys 0m0.093s