环形缓冲区经常被使用到,尤其在生产者和消费者的模型中,假设生产者专门用于产生数据,而消费者专门用于处理数据,由于各种原因,可能生产者和消费者产生数据和处理数据的速度不一,比如如果处理速度有慢又快,在慢的时候,消费者产生的数据来不及处理的可能被丢弃,或者强制让生产者降速等待,在快的时候,又有可能太快,而生产者供给不了,那么消费者也必须等待.正是由于快慢不一,缓冲区的存在则恰可以进行中和,协调生产者和消费者速度不一的问题.
一.内核kfifo
首先学习一下linux内核是如何设计环形缓冲区的,毕竟内核代码精炼之至,令人叹为观止.
这里是linux2.6.27的代码
1.kfifo的结构类型
struct kfifo {
unsigned char *buffer; /* the buffer holding the data */
unsigned int size; /* the size of the allocated buffer */
unsigned int in; /* data is added at offset (in % size) */
unsigned int out; /* data is extracted from off. (out % size) */
spinlock_t *lock; /* protects concurrent modifications */
};
这里发现我们用in out描述put get操作fifo的位置,用的是unsigned int类型,后面如果我们想获得in实际在fifo的位置,用in&(size-1),这就是size下面要采用用2的乘方的原因.
而牵涉到in,out一起计算的时候,不需要进行&运算获取实际位置,即使有溢出问题也是满足的,可以使用补码进行验算,最后都是看成无符号的数.
在in out增加和减少,会自己溢出回归.
2.kfifo_init
struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size,
gfp_t gfp_mask, spinlock_t *lock)
{
struct kfifo *fifo;
/* size must be a power of 2 */
BUG_ON(!is_power_of_2(size));
fifo = kmalloc(sizeof(struct kfifo), gfp_mask);
if (!fifo)
return ERR_PTR(-ENOMEM);
fifo->buffer = buffer;
fifo->size = size;
fifo->in = fifo->out = 0;
fifo->lock = lock;
return fifo;
}
bool is_power_of_2(unsigned long n)
{
return (n != 0 && ((n & (n - 1)) == 0));
}
申请分配一个kfifo的结构体指针,初始化buffer使用的是函数外部的空间,in,out为0,size其中必须为2的乘方,意义为下面size-1方便进行与运算.
3.kfifo_alloc
struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{
unsigned char *buffer;
struct kfifo *ret;
/*
* round up to the next power of 2, since our 'let the indices
* wrap' tachnique works only in this case.
*/
if (size & (size - 1)) {
BUG_ON(size > 0x80000000);
size = roundup_pow_of_two(size);
}
buffer = kmalloc(size, gfp_mask);
if (!buffer)
return ERR_PTR(-ENOMEM);
ret = kfifo_init(buffer, size, gfp_mask, lock);
if (IS_ERR(ret))
kfree(buffer);
return ret;
}
这个函数主要就是申请size的buffer空间,然后调用kfifo_init初始化.
4.kfifo_free
void kfifo_free(struct kfifo *fifo)
{
kfree(fifo->buffer);
kfree(fifo);
}
这个函数和kfifo_alloc配合使用,用于释放内存,先释放buffer,再释放结构体指针fifo.
5.kfifo_reset
static inline void __kfifo_reset(struct kfifo *fifo)
{
fifo->in = fifo->out = 0;
}
static inline void kfifo_reset(struct kfifo *fifo)
{
unsigned long flags;
spin_lock_irqsave(fifo->lock, flags);
__kfifo_reset(fifo);
spin_unlock_irqrestore(fifo->lock, flags);
}
重置in out位置为0
6.kfifo_len
static inline unsigned int __kfifo_len(struct kfifo *fifo)
{
return fifo->in - fifo->out;
}
static inline unsigned int kfifo_len(struct kfifo *fifo)
{
unsigned long flags;
unsigned int ret;
spin_lock_irqsave(fifo->lock, flags);
ret = __kfifo_len(fifo);
spin_unlock_irqrestore(fifo->lock, flags);
return ret;
}
得到fifo中数据的长度,用fifo->in - fifo->out
是没有问题的,即便在unsigned int型溢出时也是对的,具体可以使用补码进行运算.
7.kfifo_put
unsigned int __kfifo_put(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int l;
//put进去的字节数不能大于fifo剩余的字节数
len = min(len, fifo->size - fifo->in + fifo->out);
smp_mb();
/*fifo->in & (fifo->size - 1)通过这个与运算,相当于把
fifo->in是size的倍数给去掉了,得到的是在size里的位
置,就是在这个buffer的位置.
而l所表示的是要put进去的字节数和从in开始到buffer
结尾字节数的小值,就是从in到buffer结尾能不能放下
目的字节数*/
l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
/*如果放不下,copy分两部分,一部分从in在位置复制l字
节数.一部分从buffer开始复制len-l字节数,如果放得
下,那len-l为0,一样可以*/
memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
memcpy(fifo->buffer, buffer + l, len - l);
smp_wmb();
//更新in所在位置
fifo->in += len;
return len;
}
static inline unsigned int kfifo_put(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned long flags;
unsigned int ret;
spin_lock_irqsave(fifo->lock, flags);
ret = __kfifo_put(fifo, buffer, len);
spin_unlock_irqrestore(fifo->lock, flags);
return ret;
}
8.kfifo_get
unsigned int __kfifo_get(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int l;
//get的字节数和fifo buffer中字节数比较,len为最终要get的字节数
len = min(len, fifo->in - fifo->out);
smp_rmb();
//要get的字节数,和out所在位置到fifo buffer结尾字节数,比较
l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
//和put同理,两部分,一部分copy l字节数,一部分copy len-l字节数,注意方向
memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
memcpy(buffer + l, fifo->buffer, len - l);
smp_mb();
//更新out位置
fifo->out += len;
return len;
}
static inline unsigned int kfifo_get(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned long flags;
unsigned int ret;
spin_lock_irqsave(fifo->lock, flags);
ret = __kfifo_get(fifo, buffer, len);
//如果没有数据,重置
if (fifo->in == fifo->out)
fifo->in = fifo->out = 0;
spin_unlock_irqrestore(fifo->lock, flags)
//返回get的字节数
return ret;
}
二.仿造kfifo,编写的环形缓冲区ring.c ring.h
/*ring.h*/
#ifndef RING_H
#define RING_H
#include <pthread.h>
struct ring{
unsigned char *buffer; /* the buffer holding the data */
unsigned int size; /* the size of the allocated buffer */
unsigned int in; /* data is added at offset (in % size) */
unsigned int out; /* data is extracted from off. (out % size) */
pthread_mutex_t *lock; /* protects concurrent modifications */
};
extern struct ring *ring_init(unsigned char *buffer, unsigned int size,pthread_mutex_t *lock);
extern struct ring *ring_alloc(unsigned int size,pthread_mutex_t *lock);
extern void ring_free(struct ring *fifo);
extern unsigned int __ring_put(struct ring *fifo,
unsigned char *buffer, unsigned int len);
extern unsigned int __ring_get(struct ring *fifo,
unsigned char *buffer, unsigned int len);
static inline void __ring_reset(struct ring *fifo)
{
fifo->in = fifo->out = 0;
}
static inline void ring_reset(struct ring *fifo)
{
unsigned long flags;
pthread_mutex_lock(fifo->lock);
__ring_reset(fifo);
pthread_mutex_unlock(fifo->lock);
}
static inline unsigned int __ring_len(struct ring *fifo)
{
return fifo->in - fifo->out;
}
static inline unsigned int ring_len(struct ring *fifo)
{
unsigned int ret;
pthread_mutex_lock(fifo->lock);
ret = __ring_len(fifo);
pthread_mutex_unlock(fifo->lock);
return ret;
}
static inline unsigned int ring_put(struct ring *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int ret;
pthread_mutex_lock(fifo->lock);
ret = __ring_put(fifo, buffer, len);
pthread_mutex_unlock(fifo->lock);
return ret;
}
static inline unsigned int ring_get(struct ring *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int ret;
pthread_mutex_lock(fifo->lock);
ret = __ring_get(fifo, buffer, len);
if (fifo->in == fifo->out)
fifo->in = fifo->out = 0;
pthread_mutex_unlock(fifo->lock);
return ret;
}
#endif
/*ring.c*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include "ring.h"
#define is_power_of_2(x) ((x) != 0 && (((x) & ((x) - 1)) == 0))
#define min(x,y) ({ \
typeof(x) _x = (x); \
typeof(y) _y = (y); \
(void) (&_x == &_y); \
_x < _y ? _x : _y; })
struct ring *ring_init(unsigned char *buffer, unsigned int size,
pthread_mutex_t *lock)
{
struct ring *fifo = NULL;
if(!is_power_of_2(size)){
printf("size is not power of 2\n");
return fifo;
}
fifo = (struct ring *)malloc(sizeof(struct ring));
if (!fifo){
printf("fifo malloc error\n");
return fifo;
}
fifo->buffer = buffer;
fifo->size = size;
fifo->in = fifo->out = 0;
fifo->lock = lock;
return fifo;
}
struct ring *ring_alloc(unsigned int size,pthread_mutex_t *lock)
{
unsigned char *buffer = NULL;
struct ring *ret = NULL;
buffer = (unsigned char *)malloc(size);
if (!buffer){
printf("buffer malloc error\n");
return ret;
}
ret = ring_init(buffer, size, lock);
return ret;
}
void ring_free(struct ring *fifo)
{
free(fifo->buffer);
free(fifo);
}
unsigned int __ring_put(struct ring *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int l;
len = min(len, fifo->size - fifo->in + fifo->out);
l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
memcpy(fifo->buffer, buffer + l, len - l);
fifo->in += len;
return len;
}
unsigned int __ring_get(struct ring *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int l;
len = min(len, fifo->in - fifo->out);
l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
memcpy(buffer + l, fifo->buffer, len - l);
fifo->out += len;
return len;
}
三.测试
测试的main.c文件如下:
/*main.c*/
#include <stdio.h>
#include <pthread.h>
#include <signal.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>
#include "ring.h"
struct data{
int a;
time_t t;
};
pthread_t tid1;
pthread_t tid2;
void sig_handler(int sig)
{
if(sig == SIGINT){
if(pthread_cancel(tid1) != 0){
perror("thread cancel fail");
exit(0);
}
if(pthread_cancel(tid2) != 0){
perror("thread cancel fail");
exit(0);
}
printf("\n\n 两个线程取消\n");
}
}
void * put_proc(void * arg)
{
signal(SIGINT, sig_handler);
if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL) != 0){
perror("pthread set cancel state fail");
pthread_exit(NULL);
exit(0);
}
if(pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL) != 0){
perror("pthread set cancel type fail");
pthread_exit(NULL);
exit(0);
}
int i = 0;
struct data data_put;
struct ring * ring_buf = (struct ring *)arg;
int len = sizeof(struct data);
int ret;
while(1){
data_put.a = i;
time(&data_put.t);
ret = ring_put(ring_buf,(unsigned char *)&data_put,len);
printf("ret put:%d\nput data:%d\ntime:%s\n\n",ret,data_put.a,ctime(&data_put.t));
i++;
sleep(2);
}
}
void * get_proc(void * arg)
{
signal(SIGINT, sig_handler);
if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL) != 0){
perror("pthread set cancel state fail");
pthread_exit(NULL);
exit(0);
}
if(pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL) != 0){
perror("pthread set cancel type fail");
pthread_exit(NULL);
exit(0);
}
struct ring * ring_buf = (struct ring *)arg;
int len = sizeof(struct data);
struct data data_get;
int ret;
while(1){
ret = ring_get(ring_buf,(unsigned char *)&data_get,len);
printf("ret get:%d\nget data:%d\ntime:%s\n\n",ret,data_get.a,ctime(&data_get.t));
sleep(2);
}
}
int main(int argc, char const *argv[])
{
signal(SIGINT, sig_handler);
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
struct ring * ring_buf = NULL;
ring_buf = ring_alloc(32,&lock);
int err;
err = pthread_create(&tid1, NULL, put_proc, ring_buf);
if(err){
printf("fail create thread 1\n");
goto end;
}
err = pthread_create(&tid2, NULL, get_proc, ring_buf);
if(err){
printf("fail create thread 2\n");
goto end;
}
pthread_join(tid1,NULL);
pthread_join(tid2,NULL);
printf("program end\n");
end:
ring_free(ring_buf);
return 0;
}
其中,建立了两个线程,一个用于向ring_buf写数据,一个用于向ring_buf读数据,数据定义时加上了时间信息便于查看.互斥量的使用主要用于线程同步,比如两个线程如果都向缓冲区写数据时,必须保证临界区的安全,当然也可以使用读写锁,其实更好一些,因为读的时候,也可以写.
信号处理函数,用于ctrl+c强制结束时,异步取消线程.
结果:
ret get:0
get data:1032341248
time:Sun Apr 21 12:30:00 4461252
ret put:16
put data:0
time:Mon Jan 8 17:35:33 2018
ret get:16
get data:0
time:Mon Jan 8 17:35:33 2018
ret put:16
put data:1
time:Mon Jan 8 17:35:35 2018
ret get:16
get data:1
time:Mon Jan 8 17:35:35 2018
ret put:16
put data:2
time:Mon Jan 8 17:35:37 2018
ret get:16
get data:2
time:Mon Jan 8 17:35:37 2018
ret put:16
put data:3
time:Mon Jan 8 17:35:39 2018
ret get:16
get data:3
time:Mon Jan 8 17:35:39 2018
ret put:16
put data:4
time:Mon Jan 8 17:35:41 2018
^C
两个线程取消
program end
刚开始,get线程没有读出数据
后面就是put一个,get一个,没问题.