libevent学习笔记
libevent是一个事件驱动编程库,可以在文件描述符上发生特定事件、超时后,执行相应的回调函数。回调函数还可以由信号、定时器触发。
使用libevent后,你不需要为事件驱动的网络服务编写事件循环,而只需要调用 event_dispatch()并动态注册/删除事件。
libevent目前支持/dev/poll, kqueue(2), event ports, POSIX select(2), Windows select(), poll(2), epoll(4)等I/O多路复用机制,这些底层机制和libevent提供的API完全解耦,你不需要关心。libevent可以根据系统选择最适合的机制。
你可以在多线程环境下使用libevent,并选取以下编程风格中的一种:
- 每个线程独立使用一个 event_base
- 使用共享的event_base并使用锁保护
除了基础的事件轮询之外,libevent还实现了精巧的网络IO框架,支持套接字、过滤器、限速、SSL、零拷贝文件传输、IOCP,libevent内置对DNS、HTTP的支持,还提供了一个小型的RPC框架。
1 2 3 4 5 6 7 8 |
wget https://github.com/libevent/libevent/releases/download/release-2.1.8-stable/libevent-2.1.8-stable.tar.gz tar -zxvf libevent-2.1.8-stable.tar.gz pushd libevent-2.1.8-stable mkdir build cd build cmake -DCMAKE_BUILD_TYPE=Debug .. make sudo make install |
本章以Linux下的epoll为例,了解I/O多路复用技术的原理。
支持I/O操作的内核对象,都属于流(Stream),包括文件、管道、套接字。操控流时需要使用其文件描述符。当流中没有数据时,读线程会阻塞;当流已经写满数据时,写线程会阻塞。
为了解决阻塞的问题,Linux提供了多种系统调用,包括select、epoll。后者目前被广泛使用,它的优势是:
- 无需遍历被监控文件描述符的完整集合,只需要关注活动的连接
- 支持处理大量连接请求而不出现性能下降
epoll支持两种事件触发模式:
- 水平触发:内核事件集会拷贝给用户态事件,假设用户处理了一部分,那些没有处理的会在下次epoll_wait再次返回
- 边缘触发:任何内核事件仅仅会被通知一次。拷贝减少性能增加,但是用户代码如果没有处理事件,会导致事件丢失
1 2 3 4 5 6 |
// size 提示内核此epoll实例需要监控的fd的数量 // 返回epoll实例的文件描述符 int epoll_create(int size); // 示例: int epfd = epoll_create(1024); |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
// epfd 控制的epoll实例 // op 执行的控制行为: // EPOLL_CTL_ADD,注册新的fd到epfd // EPOLL_CTL_MOD,修改已经注册的fd的监听事件 // EPOLL_CTL_DEL,删除一个fd // // fd 目标文件描述符 // event 告诉内核需要监听的事件 // // 成功返回0,失败返回-1, errno查看错误信息 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); // 事件的结构如下: struct epoll_event { // EPOLLIN, EPOLLOUT, EPOLLPRI, EPOLLHUP, EPOLLET, EPOLLONESHOT等类型 uint32_t events; // 事件类型 epoll_data_t data; // 用户传递的数据 } // 用户数据联合体 typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t; |
1 2 3 4 5 6 7 8 9 10 11 |
// epfd 等待的epoll实例 // event 事件缓冲,内核将等到的事件存放到此数组中 // maxevents 最多获取多少个事件,不得大于epoll_create()调用时指定的size // timeout 超时,超过指定的时间从等待返回: // -1: 永久阻塞 0: 立即返回,非阻塞 >0: 指定微秒数 // 返回 就绪的文件描述符数量 int epoll_wait(int epfd, struct epoll_event *event, int maxevents, int timeout); // 示例: struct epoll_event event_buffer[512]; int event_count = epoll_wait(epfd, epoll_event, 512, -1); |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
// 创建实例 int epfd = epoll_crete(1024); // 将监听套接字的文件描述符加入监控 epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &listen_event); while (1) { // 等待就绪 int event_count = epoll_wait(epfd, events, 512, -1); // 遍历就绪事件列表 for (i = 0 ; i < event_count; i++) { if (evnets[i].data.fd == listen_fd) { // 这是监听套接字上的事件,accept连接,并将连接套接字的fd加入到epoll } else if (events[i].events & EPOLLIN) { // 可读 } else if (events[i].events & EPOLLOUT) { // 可写 } } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
#include <stdio.h> #include <stdlib.h> #include <ctype.h> #include <string.h> #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #include <sys/epoll.h> #define SERVER_PORT (8080) #define SERVER_IP ("127.0.0.1") #define EPOLL_MAX_NUM (1024) #define BUFFER_MAX_LEN (4096) char buffer[BUFFER_MAX_LEN]; int main(int argc, char **argv) { int listen_fd = 0; int client_fd = 0; struct sockaddr_in server_addr; struct sockaddr_in client_addr; socklen_t client_len; int epfd = 0; struct epoll_event event, *events_buffer; // 创建套接字 listen_fd = socket(AF_INET, SOCK_STREAM, 0); // 绑定到指定地址和端口 server_addr.sin_family = AF_INET; // 展现形式(p,字符串)转换为数字形式(n) inet_pton(AF_INET, SERVER_IP, &server_addr.sin_addr); server_addr.sin_port = htons(SERVER_PORT); bind(listen_fd, (struct sockaddr *) &server_addr, sizeof(server_addr)); // 开始监听连接 listen(listen_fd, 10); // 创建epoll实例 epfd = epoll_create(EPOLL_MAX_NUM); if (epfd < 0) goto END; // 将监听套接字的可读事件加入到epoll event.events = EPOLLIN; event.data.fd = listen_fd; if (epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &event) < 0) goto END; // 分配事件缓冲 events_buffer = malloc(sizeof(struct epoll_event)*EPOLL_MAX_NUM); // 事件循环 while (1) { // 等待一批事件,有事件就可能返回,不一定要等待到EPOLL_MAX_NUM int active_fds_cnt = epoll_wait(epfd, events_buffer, EPOLL_MAX_NUM, 1000); for (int i = 0; i < active_fds_cnt; i++) { // 监听套接字就绪 if (events_buffer[i].data.fd==listen_fd) { // 接受新连接 client_fd = accept(listen_fd, (struct sockaddr *) &client_addr, &client_len); if (client_fd < 0) continue; char ip[20]; // 数字转换为展现形式 const char *client_ip = inet_ntop(AF_INET, &client_addr.sin_addr, ip, sizeof(ip)); // 网络序转换为主机序 uint16_t client_port = ntohs(client_addr.sin_port); printf("new connection %s:%d accepted\n", client_ip, client_port); // 将连接套接字加入epoll,关注其可读可写事件 event.events = EPOLLIN | EPOLLET; event.data.fd = client_fd; epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &event); } else if (events_buffer[i].events & EPOLLIN) { // 连接套接字就绪,可读 client_fd = events_buffer[i].data.fd; buffer[0] = '\0'; // 读入最多5字节 int n = read(client_fd, buffer, 5); if (n < 0) { // 读取失败 continue; } else if (n==0) { // 读取到0字节,说明EOF,关闭连接并注册epoll监听 epoll_ctl(epfd, EPOLL_CTL_DEL, client_fd, &event); close(client_fd); } else { // 读取到数据 printf("data from client: %s\n", buffer); // 原样写回去 buffer[n] = '\0'; // 这是阻塞写 write(client_fd, buffer, strlen(buffer)); // 重置缓冲区 memset(buffer, 0, BUFFER_MAX_LEN); } } else if (events_buffer[i].events & EPOLLOUT) { // 在这里进行非阻塞写 } } } END: close(epfd); close(listen_fd); return 0; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <strings.h> #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #include <unistd.h> #include <fcntl.h> #define BUFFER_MAX_LEN (1024) #define SERVER_PORT (8080) #define SERVER_IP ("127.0.0.1") int main(int argc, char **argv) { int sockfd; char recvbuf[BUFFER_MAX_LEN + 1] = {0}; struct sockaddr_in server_addr; // 创建套接字 if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) exit(0); // 服务器地址 bzero(&server_addr, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_port = htons(SERVER_PORT); inet_pton(AF_INET, SERVER_IP, &server_addr.sin_addr); // 连接到服务器 if (connect(sockfd, (struct sockaddr *) &server_addr, sizeof(server_addr)) < 0) { fprintf(stderr, "failed to connect to server \n"); exit(0); } char input[100]; // 从标准输入读取输入 while (fgets(input, 100, stdin)!=NULL) { send(sockfd, input, strlen(input), 0); // 读取服务器输出 read(sockfd, recvbuf, BUFFER_MAX_LEN); printf("echo %s\n", recvbuf); } } |
使用 libevent 函数之前需要分配一个或者多个event_base结构。此结构持有libevent分发循环(dispatch loop)的信息和状态,它是libevent的中心,跟踪所有未决、活动事件并将活动事件通知给应用程序。
event_base是一个不透明结构(其字段没有定义在头文件中)。调用 event_base_new()或 event_base_new_with_config()可以实例化event_base。
配合锁,你可以在多个线程中访问event_base,但是,其事件循环只能在单个线程中执行。
event_base可以选择一种IO多路复用技术,作为检测事件是否就绪的手段。这些IO多路复用技术包括: select, poll, epoll, kqueue, devpoll, evport, win32
大部分情况下,调用下面的函数即可:
1 2 3 |
// 创建并返回一个具有默认设置的event_base // 如果出错返回NULL struct event_base *event_base_new(void); |
如果要定制even_base的特性,可以调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
// 创建一个配置 struct event_config *event_config_new(void); // 定制化 // 禁止使用某种IO多路复用机制 int event_config_avoid_method(struct event_config *cfg, const char *method); // 要求IO多路复用机制支持某些特性,不支持特性的后端不被使用 enum event_method_feature { EV_FEATURE_ET = 0x01, // 支持边缘触发 EV_FEATURE_O1 = 0x02, // 添加、删除单个事件,以及确定哪个事件是激活的 —— 这些操作必须是O(1)复杂度 EV_FEATURE_FDS = 0x04, // 支持任何文件描述符,而不仅仅是套接字 }; int event_config_require_features(struct event_config *cfg, enum event_method_feature feature); // 设置标记 enum event_base_config_flag { EVENT_BASE_FLAG_NOLOCK = 0x01, // 不要为 event_base 分配锁,节省加解锁开销,但是多线程访问不安全 EVENT_BASE_FLAG_IGNORE_ENV = 0x02, // 选择使用的后端时,不要检测 EVENT* 环境变量 EVENT_BASE_FLAG_NO_CACHE_TIME = 0x08, EVENT_BASE_FLAG_EPOLL_USE_CHANGELIST = 0x10, // 如果选择了epoll后端,则使用更快的epoll-changelist。不得传递基于dup()或其变体克隆的文件描述符给lbevent EVENT_BASE_FLAG_PRECISE_TIMER = 0x20 }; int event_config_set_flag(struct event_config *cfg, enum event_base_config_flag flag); // 使用配置实例化 struct event_base * event_base_new_with_config(const struct event_config *cfg); |
1 2 |
// 该函数不会释放与 event_base 关联的任何事件,或者关闭他们的套接字,或者释放任何指针 void event_base_free(struct event_base *base); |
libevent支持为事件设置多个优先级。默认情况下event_base仅仅支持单个优先级,调用下面的方法设置优先级数量:
1 |
int event_base_priority_init(struct event_base *base, int n_priorities); |
新的事件可以使用[0,n_priorities-1]这个范围的优先级,值越低优先级越高。
某些事件后端,在fork后不能正常工作,你可能需要重现初始化event_base:
1 2 3 4 5 6 7 8 9 |
struct event_base *base = event_base_new(); if (fork()) { continue_running_parent(base); } else { // 子进程 event_reinit(base); continue_running_child(base); } |
事件是libevent操控的基本单元,事件代表一组条件的集合,这些条件包括:
- 文件描述符已经就绪,可以读取或者写入
- 对于边缘触发:文件描述符变为就绪状态,可以读取或者写入
- 超时
- 信号
- 用户触发事件
所有事件具有相似的生命周期:
- 创建事件(并关联到event_base)后,变为initialized状态
- 添加到event_base中后,变为pending状态
- 触发事件的条件发生后,变为active状态。事件的回调会执行
- 如果将事件配置为persistent,则它总是保持在pending状态。否则,执行回调后,事件不再是pending
- 从event_base删除事件后,它从pending变为initialized。再次添加则由变为pending
调用event_new函数可以创建事件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
// 指示超时已经发生 #define EV_TIMEOUT 0x01 // 等待套接字或FD可读 #define EV_READ 0x02 // 等待套接字或FD可写 #define EV_WRITE 0x04 // 等待POSIX信号发生 #define EV_SIGNAL 0x08 // 持久化事件 —— 不会因为激活而被自动移除 #define EV_PERSIST 0x10 // 使用边缘触发 #define EV_ET 0x20 // 事件回调函数指针的定义 typedef void (*event_callback_fn)(evutil_socket_t, short, void *); // 分配一个事件并关联到event_base // base 事件关联到的event_base // fd 需要监控的文件描述符或者信号 // what 希望监控的条件、事件的性质、触发方式 EV_READ, EV_WRITE, EV_SIGNAL, EV_PERSIST, EV_ET // cb 事件发生后执行的回调函数 // arg 回调函数的参数 struct event *event_new(struct event_base *base, evutil_socket_t fd, short what, event_callback_fn cb, void *arg); void event_free(struct event *event); |
event_assign也可以用来创建事件,和event_new不同的是,前者不负责分配内存,你必须自己分配好event结构:
1 |
int event_assign(struct event *, struct event_base *, evutil_socket_t, short, event_callback_fn, void *); |
事件创建后,你可以调用 event_add()或 event_del() 来添加(使之pending)、删除(使之非pending)事件。
可以使用下面的宏:
1 2 |
#define evsignal_new(base, signum, cb, arg) \ event_new(base, signum, EV_SIGNAL|EV_PERSIST, cb, arg) |
对于大部分的IO多路复用机制,每个进程任何时刻只能有一个 event_base 可以监听信号。
构造事件之后,在将其添加到 event_base 之前不能对其做任何操作。你需要调用:
1 2 3 |
// 如果tv==NULL则添加的事件不会超时 // 如果对已决事件调用该函数,则在tv后事件重新调度,此前保持未决状态 int event_add(struct event *ev, const struct timeval *tv); |
将事件加入event_base并使其进入未决(pending)状态。
调用下面的函数,可以使事件变为非未决也非激活的:
1 |
int event_del(struct event *ev); |
默认情况下,多个事件激活,其回调执行顺序是未知的。使用优先级,可以让某些事件优先执行:
1 |
int event_priority_set(struct event *event, int priority); |
注意:多个不同优先级的事件同时激活时 ,低优先级的事件不会运行 。libevent会执行高优先级的事件,然后重新检查各个事件。只有在没有高优先级的事件是激活的时候,低优先级的事件才有机会执行。
调用下面的函数,可以知晓目标事件是未决还是激活的:
1 |
int event_pending(const struct event *ev, short what, struct timeval *tv_out); |
执行下面的方法,可以调度一个”一次性“事件。
1 |
int event_base_once(struct event_base *, evutil_socket_t, short, event_callback_fn, void *, const struct timeval *); |
回调仅仅会执行一次。
即少数情况下,需要在条件不满足的情况下,强行触发一个事件
1 |
void event_active(struct event *ev, int what, short ncalls); |
你一旦为event_base注册了某些事件, 你需要让libevent等待事件的发生,并在事件发生后进行通知。
调用下面的函数启动事件循环:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
// 这些宏用作flags参数 // 阻塞,直到有事件激活,当所有事件的回调被执行后,退出事件循环 #define EVLOOP_ONCE 0x01 // 不阻塞,检查那些事件已经就绪,然后执行最高优先级的那些事件的回调,然后退出 #define EVLOOP_NONBLOCK 0x02 // 即使没有pending事件也不退出,必须调用event_base_loopexit()或event_base_loopbreak()来退出 #define EVLOOP_NO_EXIT_ON_EMPTY 0x04 // 等待活动(Active,激活)事件,并且运行它们的回调,此函数是event_base_dispatch()更灵活的版本 // 默认情况下,此事件循环会运行event base直到: // 1、没有任何pending或active事件 // 2、或者,event_base_loopbreak()或event_base_loopexit()被调用 // 返回值:如果正常退出返回0,否则返回-1 int event_base_loop(struct event_base *base, int flags); |
你也可以调用简化版本:
1 |
int event_base_dispatch(struct event_base *base); |
根据启动循环时指定的flags的不同,循环可能在没有任何pending事件(也就是已经移除所有已注册事件)后自动结束。或者,你可以调用下面的函数提前结束:
1 2 3 4 5 |
// 在给定的时间tv后停止循环,如果tv为NULL立即停止 // 注意:如果 event_base 当前正在执行任何激活事件的回调,则回调会继续运行,直到运行完所有激活事件的回调之才停止 int event_base_loopexit(struct event_base *base, struct timeval *tv); // 立即退出循环,如果正在执行回调,当前回调完成后立即停止,不考虑尚未处理的事件 int event_base_loopbreak(struct event_base *base); |
libevent提供了一种通用的缓冲机制 —— bufferevent。前面章节介绍的事件,在底层传输接口就绪后立即执行回调,而bufferevent在读写足够多的数据之后才执行回调。
bufferevent由一个底层传输接口(例如套接字)、一个读缓冲区、一个写缓冲区组成。缓冲区的类型是 struct evbuffer。
默认情况下:
- 从底层传输端口读取了任意量的数据之后会调用读取回调
- 输出缓冲区中足够量的数据被刷到底层传输端口后写入回调会被调用
通过调整“水位”,可以覆盖上述默认行为。每个bufferevent具有4个水位:
- 读取低水位:读缓冲区数据量高于此水位后,读取回调被调用。默认值0,导致一旦缓冲区有数据,立即调用回调
- 读取高水位:读缓冲区数据量高于此水位后,bufferevent不再将数据读取到缓冲区。默认值无限
- 写入低水位:写缓冲区数据量低于此水位后,写入回调被调用。默认值0,导致仅当缓冲区全部写入到底层传输接口后,才调用写入回调继续写入
- 写入高水位:没有直接使用。当一个bufferevent用作另外一个bufferevent的底层传输接口时有用
下面的函数可以调整bufferevent的读或/和写水位:
1 2 |
// 对于高水位,0表示无限 void bufferevent_setwatermark(struct bufferevent *bufev, short events, size_t lowmark, size_t highmark); |
调用下面的方式,创建基于套接字的bufferevent:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
enum bufferevent_options { // 释放 bufferevent 时关闭底层传输端口,例如:关闭底层套接字、释放底层bufferevent BEV_OPT_CLOSE_ON_FREE = (1<<0), // 自动为 bufferevent 分配锁,使之可线程安全的访问 BEV_OPT_THREADSAFE = (1<<1), // 所有回调在事件循环中延迟的调用 BEV_OPT_DEFER_CALLBACKS = (1<<2), // 执行回调时不对bufferevent进行锁定 BEV_OPT_UNLOCK_CALLBACKS = (1<<3) }; struct bufferevent *bufferevent_socket_new( // 关联到的event_base struct event_base *base, // 套接字的描述符,如果希望以后设置此描述符,传入-1 evutil_socket_t fd, enum bufferevent_options options); |
调用bufferevent_setcb可以修改bufferevent的一个或多个回调:
- readcb:缓冲区的数据可读(考虑水位)时调用
- writecb:文件描述符可以写入时(考虑水位)时调用
- eventcb:文件描述符上发生事件时调用
参数cbarg为传递给所有回调的参数。注意此参数被上述三种回调共享,修改它会影响所有回调。
要禁用回调,传输NULL作为回调函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
typedef void (*bufferevent_data_cb)(struct bufferevent *bev, void *ctx); typedef void (*bufferevent_event_cb)(struct bufferevent *bev, short events, void *ctx); void bufferevent_setcb(struct bufferevent *bufev, bufferevent_data_cb readcb, bufferevent_data_cb writecb, bufferevent_event_cb eventcb, void *cbarg); void bufferevent_getcb(struct bufferevent *bufev, bufferevent_data_cb *readcb_ptr, bufferevent_data_cb *writecb_ptr, bufferevent_event_cb *eventcb_ptr, void **cbarg_ptr); |
下面的函数用于启用、禁用某种事件:
1 2 3 4 |
void bufferevent_enable(struct bufferevent *bufev, short events); void bufferevent_disable(struct bufferevent *bufev, short events); short bufferevent_get_enabled(struct bufferevent *bufev); |
events可选EV_READ|EV_WRITE。如果没有启用读事件则bufferevent不会尝试读取数据到缓冲区,写事件类似。
调用下面的方法可以向服务器发起一个连接:
1 |
int bufferevent_socket_connect(struct bufferevent *bev, struct sockaddr *address, int addrlen); |
如果执行此调用时:
- 尚未为bufferevent设置套接字,则自动创建非阻塞套接字。
- 已经为bufferevent设置套接字,则提示套接字尚未连接,直到连接成功前不应对其进行读写操作
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
#include <event2/event.h> #include <event2/bufferevent.h> #include <sys/socket.h> #include <string.h> // 事件回调 void eventcb(struct bufferevent *bev, short events, void *ptr) { // 如果通过bufferevent_socket_connect()发起连接,将会收到BEV_EVENT_CONNECTED事件 // 如果通过connect()连接,则报告为写入事件 if (events & BEV_EVENT_CONNECTED) { // 连接成功 } else if (events & BEV_EVENT_ERROR) { // 出现错误 } } int main() { struct event_base *base; struct bufferevent *bev; struct sockaddr_in sin; base = event_base_new(); // 连接到服务器127.0.0.1:8080 memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_addr.s_addr = htonl(0x7f000001); sin.sin_port = htons(8080); // 创建bufferevent bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE); // 设置回调 bufferevent_setcb(bev, NULL, NULL, eventcb, NULL); // 发起连接 if (bufferevent_socket_connect(bev, (struct sockaddr *)&sin, sizeof(sin)) < 0) { // 出错,释放bufferevent bufferevent_free(bev); return -1; } // 启动事件循环 event_base_dispatch(base); return 0; } |
调用下面的函数,可以获得bufferevent的读写缓冲区:
1 2 3 4 5 6 |
// 读缓冲 struct evbuffer *bufferevent_get_input(struct bufferevent *bufev); // 写缓冲 struct evbuffer *bufferevent_get_output(struct bufferevent *bufev); // 注意,不得在这些缓冲区上设置回调 |
1 2 3 4 |
// 将data的前size个字节写入输出缓冲区 int bufferevent_write(struct bufferevent *bufev, const void *data, size_t size); // 移除buf中所有内容,将其放入输出缓冲区的尾部 int bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf); |
1 2 3 4 |
// 读取并移除输入缓冲区的最多size字节,并转储到data中,返回实际读取(移除)的字节数 size_t bufferevent_read(struct bufferevent *bufev, void *data, size_t size); // 抽取输入缓冲区中所有数据,存放到buf中 int bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf); |
刷出(flush)读写缓冲,意味着需要尽可能的:
- 从底层传输接口读取尽可能多的数据
- 将写缓冲区的数据尽可能写如底层传输接口
1 2 3 |
// iotype,可以是 EV_READ | EV_WRITE 表示需要读还是写 // state,BEV_FINISHED告知对端,没有更多数据需要发送了。BEV_NORMAL、BEV_FLUSH的含义依赖于bufferevent类型 int bufferevent_flush(struct bufferevent *bufev, short iotype, enum bufferevent_flush_mode state); |
读写bufferevnet的数据,操作的主要类型是结构evbuffer。evbuffer实现了一种优化的字节队列,支持从尾部追加数据,或从头部移除数据。
evbuffer用于缓冲网络I/O的缓冲区的处理。IO调度、触发由bufferevent负责。
1 |
struct evbuffer *evbuffer_new(void); |
1 |
void evbuffer_free(struct evbuffer *buf); |
默认情况下,在多线程中访问evbuffer是不安全的,除非调用下面的方法启用锁:
1 2 |
// 如果lock为NULL,则libevent利用evthread_set_lock_creation_callback创建锁 int evbuffer_enable_locking(struct evbuffer *buf, void *lock); |
要启用、禁用锁,调用下面的函数:
1 2 |
void evbuffer_lock(struct evbuffer *buf); void evbuffer_unlock(struct evbuffer *buf); |
下面的函数返回evbuffer存储的字节数:
1 |
size_t evbuffer_get_length(const struct evbuffer *buf); |
1 2 3 4 5 6 |
// 从data处开始,写入datlen字节数进去 int evbuffer_add(struct evbuffer *buf, const void *data, size_t datlen); // 格式化数据并全部写入 int evbuffer_add_printf(struct evbuffer *buf, const char *fmt, ...) int evbuffer_add_vprintf(struct evbuffer *buf, const char *fmt, va_list ap); |
1 |
int evbuffer_expand(struct evbuffer *buf, size_t datlen); |
上述函数确保evbuffer能足以容纳datlen字节,而不需要更多的内存分配
libevent提供了将数据从一个缓冲移动到另外一个的快捷方式:
1 2 3 4 |
// 将src中的数据移动到dst的尾部,成功返回0 int evbuffer_add_buffer(struct evbuffer *dst, struct evbuffer *src); // 将src的datlen字节移动到dst的尾部 int evbuffer_remove_buffer(struct evbuffer *src, struct evbuffer *dst, size_t datlen); |
1 |
void bufferevent_free(struct bufferevent *bev); |
上面的函数用于释放bufferevent,bufferevent的内部具有引用计数机制,当上述函数调用时,尚有未决的延迟回调,则回调执行完毕之前不会删除bufferevent。
如果指定了BEV_OPT_CLOSE_ON_FREE,并且bufferevent使用套接字、其它bufferevent作为其底层传输接口,则它们会被一并关闭。
在event2/listener.h中定义的连接监听器 —— evconnlistener,为libevent添加了监听/接受TCP连接的方法。
要创建新的连接监听器,调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
struct evconnlistener * evconnlistener_new( // 关联的event_base struct event_base *base, // 新连接的回调,如果NULL则连接监听器是禁用的 evconnlistener_cb cb, // 传递给回调的参数 void *ptr, // 用于控制回调函数的行为 unsigned flags, // 尚未被接受的、排队的连接最大数量。如果为负数,libevent自动选择适当的值;如果为0,则认为提供的套接字的listen已经调用过 int backlog, // 监听套接字的FD evutil_socket_t fd ); struct evconnlistener * evconnlistener_new_bind(struct event_base *base, evconnlistener_cb cb, void *ptr, unsigned flags, int backlog, // 由libevent负责绑定监听套接字 const struct sockaddr *sa, int socklen); // 回调函数: typedef void (*evconnlistener_cb)(struct evconnlistener *listener, // 新接受的套接字 evutil_socket_t sock, // 客户端地址和地址长度 struct sockaddr *addr, int len, void *ptr); // ptr是先前提供的指针 // flags支持: // 默认情况下,监听器接收到新连接后自动将其设置为非阻塞的。下面的标记禁用此行为 #define LEV_OPT_LEAVE_SOCKETS_BLOCKING (1u<<0) // 释放此连接监听器时,自动关闭底层套接字 #define LEV_OPT_CLOSE_ON_FREE (1u<<1) // 为底层套接字设置 close-on-exec 标志 #define LEV_OPT_CLOSE_ON_EXEC (1u<<2) // 某些平台在默认情况下,关闭某监听套接字后,需要经过一个超时,相同端口才可再次绑定,此选项禁用此行为,可以立即再次绑定 #define LEV_OPT_REUSEABLE (1u<<3) // 为连接监听器加锁,以便线程安全的访问 #define LEV_OPT_THREADSAFE (1u<<4) // 以禁用状态创建连接监听器,后续必须通过evconnlistener_enable()启用 #define LEV_OPT_DISABLED (1u<<5) // 提示libevent,如果可能的化,监听器应该延迟accept()连接,直到数据可用 #define LEV_OPT_DEFERRED_ACCEPT (1u<<6) // 提示允许多个服务器(进程/线程——绑定到相同端口 #define LEV_OPT_REUSEABLE_PORT (1u<<7) |
连接监听器依赖于event_base,当监听套接字上有新的TCP连接后,event_base会通知evconnlistener。
1 |
void evconnlistener_free(struct evconnlistener *lev); |
下面的函数可以禁止、重新允许连接监听器监听新连接:
1 2 |
int evconnlistener_disable(struct evconnlistener *lev); int evconnlistener_enable(struct evconnlistener *lev); |
1 |
void evconnlistener_set_cb(struct evconnlistener *lev, evconnlistener_cb cb, void *arg); |
1 2 3 4 |
// 获取监听套接字 evutil_socket_t evconnlistener_get_fd(struct evconnlistener *lev); // 获取关联的event_base struct event_base *evconnlistener_get_base(struct evconnlistener *lev); |
当监听器accept失败后,可以调用预先设置的错误回调:
1 2 |
typedef void (*evconnlistener_errorcb)(struct evconnlistener *lis, void *ptr); void evconnlistener_set_error_cb(struct evconnlistener *lev, evconnlistener_errorcb errorcb); |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
#include <string.h> #include <errno.h> #include <stdio.h> #include <signal.h> #ifndef _WIN32 #include <netinet/in.h> # ifdef _XOPEN_SOURCE_EXTENDED # include <arpa/inet.h> # endif #include <sys/socket.h> #endif #include <event2/bufferevent.h> #include <event2/buffer.h> #include <event2/listener.h> #include <event2/util.h> #include <event2/event.h> static const char MESSAGE[] = "Hello, World!\n"; static const int PORT = 8080; static void listener_cb(struct evconnlistener *, evutil_socket_t, struct sockaddr *, int socklen, void *); static void conn_writecb(struct bufferevent *, void *); static void conn_eventcb(struct bufferevent *, short, void *); static void signal_cb(evutil_socket_t, short, void *); int main(int argc, char **argv) { struct event_base *base; struct evconnlistener *listener; struct event *signal_event; struct sockaddr_in sin; // 创建event_base base = event_base_new(); if (!base) { // 初始化libevent失败 return 1; } memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = htons(PORT); // 创建连接监听器 listener = evconnlistener_new_bind(base, listener_cb, (void *) base, LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, -1, (struct sockaddr *) &sin, sizeof(sin)); if (!listener) { // 创建连接监听器失败 return 1; } // 添加中断(Ctrl-C)信号事件 signal_event = evsignal_new(base, SIGINT, signal_cb, (void *) base); if (!signal_event || event_add(signal_event, NULL) < 0) { // 创建或添加信号事件失败 return 1; } // 开始执行事件循环,直到Ctrl-C不会退出 event_base_dispatch(base); evconnlistener_free(listener); // 销毁监听器 event_free(signal_event); // 销毁事件 event_base_free(base); // 销毁event_base return 0; } // 接收到新连接请求 static void listener_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sa, int socklen, void *user_data) { struct event_base *base = user_data; struct bufferevent *bev; // 为此连接套接字创建bufferevent bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); if (!bev) { // 创建失败,退出事件循环 event_base_loopbreak(base); return; } bufferevent_setcb(bev, NULL, conn_writecb, conn_eventcb, NULL); // 设置回调 bufferevent_enable(bev, EV_WRITE); // 启用写事件,支持向套接字写数据 bufferevent_disable(bev, EV_READ); // 禁用读事件 // 将需要发送给客户端的数据填充到bufferevent的缓冲中 // 当文件描述符(客户端连接)可写时自动刷出 bufferevent_write(bev, MESSAGE, strlen(MESSAGE)); } // 可以向底层连接写入数据了 static void conn_writecb(struct bufferevent *bev, void *user_data) { struct evbuffer *output = bufferevent_get_output(bev); if (evbuffer_get_length(output)==0) { // 输出缓冲已经自动刷空 // 关闭bufferevent,这里会导致底层连接被关闭 bufferevent_free(bev); } } static void conn_eventcb(struct bufferevent *bev, short events, void *user_data) { if (events & BEV_EVENT_EOF) { // 连接被关闭 } else if (events & BEV_EVENT_ERROR) { // 连接上出现错误 } bufferevent_free(bev); } static void signal_cb(evutil_socket_t sig, short events, void *user_data) { struct event_base *base = user_data; // 接收到信号,两秒后退出事件循环 struct timeval delay = {2, 0}; event_base_loopexit(base, &delay); } |
Leave a Reply