当我们想写入数据时,一般情况下是:
- 决定我们将要写一些数据到一个连接里,把这些数据放入缓存里
- 等待连接可写
- 写足够多我们可以写到数据
- 记下我们写的数量,如果还有要写的数据就等待这个链接重新可以写
一个bufferevent由一个下层传输,一个读缓存,一个写缓存组成。与常规events不同,bufferevent会在准备好读写的时候调用回调函数,它会在有足够读写数据的时候调用用户提供的回调函数
有很多类型的bufferevent共用一个接口:
socket-based bufferevents
asynchronous-IO bufferevents(windows only)
filtering bufferevents
paired bufferevents
注意:
不是每个bufferevent种类都会在接下来的接口中工作。bufferevent目前也只工作在面向字节流的协议比如TCP
bufferevents和evbuffers
每个bufferevent都会有输入缓存和输出缓存,这些都是evbuffer类型的
回调函数
每个bufferevent都会有两个数据相关的回调函数,一个读回调,一个写回调。默认的,读回调函数会在下层传输有数据读的时候被调用,而写回调函数会在写缓存有足够的数据空间时去调用并输出到下层传输。我们可以重写这两个函数通过调整bufferevent的读写watermarks
每个bufferevent有四个watermarks:
Read low-water mark
当有读事件发生数据读入读缓存让读缓存内数据量等于或者高于这一水平,读回调函数就会被调用,默认水平是0,那么每次读入数据到读缓存都会调用读缓存Read high-water mark
当读事件发生数据读入读缓存让数据高于这个水平,那么读缓存不再读入数据,直到用户定义的回调函数把读缓存里的数据读走至低于这一水平。默认无限制Write low-water mark
当写事件发生,让写缓存里的数据写出至低于这一水平,用户定义的写回调函数就会调用。默认是0,也就是除非写缓存是空,那么回调函数是不会被调用的。Write high-water mark
不直接使用.
同时bufferevent也有错误或事件提醒
BEV_EVENT_READING
读缓存上发生的读事件BEV_EVENT_WRITING
BEV_EVENT_ERROR
bufferevent上发生的错误,通过调用EVUTIL_SOCKET_ERROR()发现错误BEV_EVENT_TIMEOUT
BEV_EVENT_EOF
BEV_EVENT_CONNECTED
bufferevents的参数
BEV_OPT_CLOSE_ON_FREE
BEV_OPT_THREADSAFE
BEV_OPT_DEFER_CALLBACKS
让bufferevent推迟它所有的回调函数BEV_OPT_UNLOCK_CALLBACKS
释放线程锁时的回调函数
套接字相关的bufferevent
创建套接字bufferevent
struct bufferevent *bufferevent_socket_new(
struct event_base *base,
evutil_socket_t fd,
enum bufferevent_options options);
如果想之后再设置文件描述符可以现在先设置为-1
发起连接
int bufferevent_socket_connect(struct bufferevent *bev,
struct sockaddr *address, int addrlen);
如果bev此时还没有一个套接字,那么此时会分配一个新的非阻塞套接字
例子
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <sys/socket.h>
#include <string.h>
void eventcb(struct bufferevent *bev, short events, void *ptr)
{
if (events & BEV_EVENT_CONNECTED) {
/* We're connected to 127.0.0.1:8080. Ordinarily we'd do
something here, like start reading or writing. */
} else if (events & BEV_EVENT_ERROR) {
/* An error occured while connecting. */
}
}
int main_loop(void)
{
struct event_base *base;
struct bufferevent *bev;
struct sockaddr_in sin;
base = event_base_new();
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */
sin.sin_port = htons(8080); /* Port 8080 */
bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, NULL, NULL, eventcb, NULL);
if (bufferevent_socket_connect(bev,
(struct sockaddr *)&sin, sizeof(sin)) < 0) {
/* Error starting connection */
bufferevent_free(bev);
return -1;
}
event_base_dispatch(base);
return 0;
}
用主机名建立连接
int bufferevent_socket_connect_hostname(struct bufferevent *bev,
struct evdns_base *dns_base, int family, const char *hostname,
int port);
int bufferevent_socket_get_dns_error(struct bufferevent *bev);
例子
#include <event2/dns.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/util.h>
#include <event2/event.h>
#include <stdio.h>
void readcb(struct bufferevent *bev, void *ptr)
{
char buf[1024];
int n;
struct evbuffer *input = bufferevent_get_input(bev);
while ((n = evbuffer_remove(input, buf, sizeof(buf))) > 0) {
fwrite(buf, 1, n, stdout);
}
}
void eventcb(struct bufferevent *bev, short events, void *ptr)
{
if (events & BEV_EVENT_CONNECTED) {
printf("Connect okay.\n");
} else if (events & (BEV_EVENT_ERROR|BEV_EVENT_EOF)) {
struct event_base *base = ptr;
if (events & BEV_EVENT_ERROR) {
int err = bufferevent_socket_get_dns_error(bev);
if (err)
printf("DNS error: %s\n", evutil_gai_strerror(err));
}
printf("Closing\n");
bufferevent_free(bev);
event_base_loopexit(base, NULL);
}
}
int main(int argc, char **argv)
{
struct event_base *base;
struct evdns_base *dns_base;
struct bufferevent *bev;
if (argc != 3) {
printf("Trivial HTTP 0.x client\n"
"Syntax: %s [hostname] [resource]\n"
"Example: %s www.google.com /\n",argv[0],argv[0]);
return 1;
}
base = event_base_new();
dns_base = evdns_base_new(base, 1);
bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, readcb, NULL, eventcb, base);
bufferevent_enable(bev, EV_READ|EV_WRITE);
evbuffer_add_printf(bufferevent_get_output(bev), "GET %s\r\n", argv[2]);
bufferevent_socket_connect_hostname(
bev, dns_base, AF_UNSPEC, argv[1], 80);
event_base_dispatch(base);
return 0;
}
bufferevent通常的操作
- 释放一个bufferevent
void bufferevent_free(struct bufferevent *bev);
如果你设置了延迟回调函数,也会等待回调函数完成后再释放
- 回调函数的操作
typedef void (*bufferevent_data_cb)(struct bufferevent *bev, void *ctx);
typedef void (*bufferevent_event_cb)(struct bufferevent *bev,
short events, void *ctx);
void bufferevent_setcb(struct bufferevent *bufev,
bufferevent_data_cb readcb, bufferevent_data_cb writecb,
bufferevent_event_cb eventcb, void *cbarg);
void bufferevent_getcb(struct bufferevent *bufev,
bufferevent_data_cb *readcb_ptr,
bufferevent_data_cb *writecb_ptr,
bufferevent_event_cb *eventcb_ptr,
void **cbarg_ptr);
//设置事件
void bufferevent_enable(struct bufferevent *bufev, short events);
void bufferevent_disable(struct bufferevent *bufev, short events);
short bufferevent_get_enabled(struct bufferevent *bufev);
//调整水平线
void bufferevent_setwatermark(struct bufferevent *bufev, short events,
size_t lowmark, size_t highmark);
例子
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/util.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
struct info {
const char *name;
size_t total_drained;
};
void read_callback(struct bufferevent *bev, void *ctx)
{
struct info *inf = ctx;
struct evbuffer *input = bufferevent_get_input(bev);
size_t len = evbuffer_get_length(input);
if (len) {
inf->total_drained += len;
evbuffer_drain(input, len);
printf("Drained %lu bytes from %s\n",
(unsigned long) len, inf->name);
}
}
void event_callback(struct bufferevent *bev, short events, void *ctx)
{
struct info *inf = ctx;
struct evbuffer *input = bufferevent_get_input(bev);
int finished = 0;
if (events & BEV_EVENT_EOF) {
size_t len = evbuffer_get_length(input);
printf("Got a close from %s. We drained %lu bytes from it, "
"and have %lu left.\n", inf->name,
(unsigned long)inf->total_drained, (unsigned long)len);
finished = 1;
}
if (events & BEV_EVENT_ERROR) {
printf("Got an error from %s: %s\n",
inf->name, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
finished = 1;
}
if (finished) {
free(ctx);
bufferevent_free(bev);
}
}
struct bufferevent *setup_bufferevent(void)
{
struct bufferevent *b1 = NULL;
struct info *info1;
info1 = malloc(sizeof(struct info));
info1->name = "buffer 1";
info1->total_drained = 0;
/* ... Here we should set up the bufferevent and make sure it gets
connected... */
/* Trigger the read callback only whenever there is at least 128 bytes
of data in the buffer. */
bufferevent_setwatermark(b1, EV_READ, 128, 0);
bufferevent_setcb(b1, read_callback, NULL, event_callback, info1);
bufferevent_enable(b1, EV_READ); /* Start reading. */
return b1;
}
- 操作buffevent里的数据
struct evbuffer *bufferevent_get_input(struct bufferevent *bufev);
struct evbuffer *bufferevent_get_output(struct bufferevent *bufev);
这两个函数返回缓存区
int bufferevent_write(struct bufferevent *bufev,
const void *data, size_t size);
int bufferevent_write_buffer(struct bufferevent *bufev,
struct evbuffer *buf);
bufferevent_write()从data空间增加size字节到写缓存区末尾
bufferevent_write_buffer()将buf里所有的数据都加到缓存区末尾
size_t bufferevent_read(struct bufferevent *bufev, void *data, size_t size);
int bufferevent_read_buffer(struct bufferevent *bufev,
struct evbuffer *buf);
例子
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <ctype.h>
void
read_callback_uppercase(struct bufferevent *bev, void *ctx)
{
/* This callback removes the data from bev's input buffer 128
bytes at a time, uppercases it, and starts sending it
back.
(Watch out! In practice, you shouldn't use toupper to implement
a network protocol, unless you know for a fact that the current
locale is the one you want to be using.)
*/
char tmp[128];
size_t n;
int i;
while (1) {
n = bufferevent_read(bev, tmp, sizeof(tmp));
if (n <= 0)
break; /* No more data. */
for (i=0; i<n; ++i)
tmp[i] = toupper(tmp[i]);
bufferevent_write(bev, tmp, n);
}
}
struct proxy_info {
struct bufferevent *other_bev;
};
void
read_callback_proxy(struct bufferevent *bev, void *ctx)
{
/* You might use a function like this if you're implementing
a simple proxy: it will take data from one connection (on
bev), and write it to another, copying as little as
possible. */
struct proxy_info *inf = ctx;
bufferevent_read_buffer(bev,
bufferevent_get_output(inf->other_bev));
}
struct count {
unsigned long last_fib[2];
};
void
write_callback_fibonacci(struct bufferevent *bev, void *ctx)
{
/* Here's a callback that adds some Fibonacci numbers to the
output buffer of bev. It stops once we have added 1k of
data; once this data is drained, we'll add more. */
struct count *c = ctx;
struct evbuffer *tmp = evbuffer_new();
while (evbuffer_get_length(tmp) < 1024) {
unsigned long next = c->last_fib[0] + c->last_fib[1];
c->last_fib[0] = c->last_fib[1];
c->last_fib[1] = next;
evbuffer_add_printf(tmp, "%lu", next);
}
/* Now we add the whole contents of tmp to bev. */
bufferevent_write_buffer(bev, tmp);
/* We don't need tmp any longer. */
evbuffer_free(tmp);
}
-
读写时间
如果有多少时间过去了,bufferevent没有成功的读写时调用。
void bufferevent_set_timeouts(struct bufferevent *bufev,
const struct timeval *timeout_read, const struct timeval *timeout_write);
-
bufferevent上的flush
int bufferevent_flush(struct bufferevent *bufev,
short iotype, enum bufferevent_flush_mode state);
//iotype: EV_READ, EV_WRITE, or EV_READ|EV_WRITE
//state:BEV_NORMAL, BEV_FLUSH, or BEV_FINISHED. BEV_FINISHED
- 其他的一些函数
int bufferevent_priority_set(struct bufferevent *bufev, int pri);
int bufferevent_get_priority(struct bufferevent *bufev);
int bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd);
evutil_socket_t bufferevent_getfd(struct bufferevent *bufev);
struct event_base *bufferevent_get_base(struct bufferevent *bev);
struct bufferevent *bufferevent_get_underlying(struct bufferevent *bufev);
//这个函数返回一个被其他bufferevent作为下层传输的bufferevent
- 手动开锁解锁bufferevent
void bufferevent_lock(struct bufferevent *bufev);
void bufferevent_unlock(struct bufferevent *bufev);
如果创建bufferevent时没设置BEV_OPT_THREADSAFE,那么bufferevent_lock是没用的