cpp-doc-libevent使用

介绍

libevent是一个轻量级的开源的高性能的事件触发的网络库,适用于windows、linux、bsd等多种平台,内部使用select、epoll、kqueue等系统调用管理事件机制。

它被众多的开源项目使用,例如大名鼎鼎的memcached等。

特点:

  • 事件驱动,高性能;
  • 轻量级,专注于网络(相对于ACE);
  • 开放源码,代码相当精炼、易读;
  • 跨平台,支持Windows、Linux、BSD和Mac OS;
  • 支持多种I/O多路复用技术(epoll、poll、dev/poll、select和kqueue等),在不同的操作系统下,做了多路复用模型的抽象,可以选择使用不同的模型,通过事件函数提供服务;
  • 支持I/O,定时器和信号等事件;
  • 采用Reactor模式;

普通的函数调用机制:程序调用某个函数,函数执行,程序等待,函数将结果返回给调用程序(如果含有函数返回值的话),也就是顺序执行的。

Reactor模式的基本流程:应用程序需要提供相应的接口并且注册到reactor反应器上,如果相应的事件发生的话,那么reactor将自动调用相应的注册的接口函数(类似于回调函数)通知你,所以libevent是事件触发的网络库。

功能

Libevent提供了事件通知,io缓存事件,定时器,超时,异步解析dns,事件驱动的http server以及一个rpc框架。

  • 事件通知:当文件描述符可读可写时将执行回调函数。

  • IO缓存:缓存事件提供了输入输出缓存,能自动的读入和写入,用户不必直接操作io。

  • 定时器:libevent提供了定时器的机制,能够在一定的时间间隔之后调用回调函数。

  • 信号:触发信号,执行回调。

  • 异步的dns解析:libevent提供了异步解析dns服务器的dns解析函数集。

  • 事件驱动的http服务器:libevent提供了一个简单的,可集成到应用程序中的HTTP服务器。

  • RPC客户端服务器框架:libevent为创建RPC服务器和客户端创建了一个RPC框架,能自动的封装和解封数据结构。

安装步骤

1
2
3
4
5
6
7
8
9
10
11
12
13
# 下载源码包
https://github.com/libevent/libevent/releases/download/release-2.1.12-stable/libevent-2.1.12-stable.tar.gz
# 解压
tar zxvf libevent-2.1.12-stable.tar.gz
# 配置安装路径
cd libevent-2.1.12-stable
./configure --disable-openssl
# 编译并安装
make
make install

# 测试libevent是否安装成功:
ls -la /usr/local/include | grep event

相关函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 创建事件集
struct event_base *event_base_new(void);

// 创建事件
struct event event_new(struct event_base ,evutil_socket_t ,short ,event_callback_fn, void*)

// 添加事件
int event_add(struct event * ev,const struct timeval* timeout)

// 删除事件
int event_del(struct event *)

// 事件循环
int event_base_loop(struct event_base *base, int flags)

// 事件派送
int event_base_dispatch(struct event_base *event_base)

例子:echo

client.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<errno.h>
#include<unistd.h>

#include<stdio.h>
#include<string.h>
#include<stdlib.h>

#include<event.h>
#include<event2/util.h>

int connect_server(const char* server_ip, int port);
void cmd_read_data(int fd, short events, void* arg);
void socket_read_data(int fd, short events, void *arg);

int main(int argc, char** argv)
{
if( argc < 3 )
{
printf("please input 2 parameters\n");
return -1;
}

//两个参数依次是服务器端的IP地址、端口号
int sockfd = connect_server(argv[1], atoi(argv[2]));
if( sockfd == -1)
{
perror("tcp_connect error ");
return -1;
}

printf("connect to server successfully\n");

struct event_base* base = event_base_new();

struct event *ev_sockfd = event_new(base, sockfd,
EV_READ | EV_PERSIST,
socket_read_data, NULL);
event_add(ev_sockfd, NULL);

//监听终端输入事件
struct event* ev_cmd = event_new(base, STDIN_FILENO,
EV_READ | EV_PERSIST, cmd_read_data,
(void*)&sockfd);

event_add(ev_cmd, NULL);

event_base_dispatch(base);

printf("finished \n");
return 0;
}


void cmd_read_data(int fd, short events, void* arg)
{
char msg[1024];

int ret = read(fd, msg, sizeof(msg)-1);
if( ret <= 0 )
{
perror("read fail ");
exit(1);
}

int sockfd = *((int*)arg);
if(msg[ret - 1]=='\n') msg[ret - 1] = '\0';
else msg[ret] = '\0';
// 把终端的消息发送给服务器端,客户端忽略性能考虑,直接利用阻塞方式发送
printf("write to server>>> [%s]\n", msg);
write(sockfd, msg, ret);
}


void socket_read_data(int fd, short events, void *arg)
{
char msg[1024];

// 为了简单起见,不考虑读一半数据的情况
int len = read(fd, msg, sizeof(msg)-1);
if( len == 0 )
{
printf("connection close. exit~\n");
exit(1);
}else if(len < 0){
perror("read fail ");
return ;
}

msg[len] = '\0';

printf("recv from server<<<<< [%s] \n", msg);
}


typedef struct sockaddr SA;
int connect_server(const char* server_ip, int port)
{
int sockfd, status, save_errno;
struct sockaddr_in server_addr;

memset(&server_addr, 0, sizeof(server_addr) );

server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
status = inet_aton(server_ip, &server_addr.sin_addr);

if( status == 0 ) //the server_ip is not valid value
{
errno = EINVAL;
return -1;
}

sockfd = socket(PF_INET, SOCK_STREAM, 0);
if( sockfd == -1 )
return sockfd;


status = connect(sockfd, (SA*)&server_addr, sizeof(server_addr) );

if( status == -1 )
{
save_errno = errno;
close(sockfd);
errno = save_errno; //the close may be error
return -1;
}

//evutil_make_socket_nonblocking(sockfd);

return sockfd;
}
server.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>

#include <unistd.h>
#include <event.h>
#include <event2/event.h>
#include <assert.h>

#define BUFLEN 1024

typedef struct _ConnectStat {
struct event* ev;
char buf[BUFLEN];
}ConnectStat;

//echo 服务实现相关代码
ConnectStat * stat_init(int fd, struct event *ev);

void accept_connection(int fd, short events, void* arg);
void do_echo_request(int fd, short events, void *arg);
void do_echo_response(int fd, short events, void *arg);

int tcp_server_init(int port, int listen_num);

struct event_base* base;

int main(int argc, char** argv)
{

int listener = tcp_server_init(9999, 10);
if( listener == -1 )
{
perror(" tcp_server_init error ");
return -1;
}

base = event_base_new();

//添加监听客户端请求连接事件
struct event* ev_listen = event_new(base, listener, EV_READ | EV_PERSIST,
accept_connection, base);
event_add(ev_listen, NULL);


event_base_dispatch(base);

return 0;
}

ConnectStat * stat_init(int fd, struct event *ev) {
ConnectStat * temp = NULL;
temp = (ConnectStat *)malloc(sizeof(ConnectStat));

if (!temp) {
fprintf(stderr, "malloc failed. reason: %m\n");
return NULL;
}

memset(temp, '\0', sizeof(ConnectStat));
temp->ev = ev;

}

void accept_connection(int fd, short events, void* arg)
{
evutil_socket_t sockfd;

struct sockaddr_in client;
socklen_t len = sizeof(client);

sockfd = accept(fd, (struct sockaddr*)&client, &len );
evutil_make_socket_nonblocking(sockfd);

printf("accept a client %d\n", sockfd);

struct event_base* base = (struct event_base*)arg;


//仅仅是为了动态创建一个event结构体
struct event *ev = event_new(NULL, -1, 0, NULL, NULL);
ConnectStat *stat = stat_init(sockfd, ev);

//将动态创建的结构体作为event的回调参数
event_assign(ev, base, sockfd, EV_READ ,
do_echo_request, (void*)stat);

event_add(ev, NULL);
}

void do_echo_request(int fd, short events, void *arg)
{

ConnectStat *stat = (ConnectStat *)arg;
struct event *ev = stat->ev;
char *msg = stat->buf;
printf("do echo request ...\n");

int len = read(fd, msg, BUFLEN - 1);

if( len <= 0 )
{
printf("connectin close or some error happen when read\n");
event_free(ev);
close(fd);
free(stat);
return ;
}

msg[len] = '\0';
printf("recv from client<<<< %s\n", msg);

event_set(ev, fd, EV_WRITE, do_echo_response, (void*)stat);
ev->ev_base = base;
event_add(ev, NULL);
//write(fd, reply_msg, strlen(reply_msg) );
}

void do_echo_response(int fd, short events, void *arg) {
ConnectStat * stat = (ConnectStat *)(arg);
struct event *ev = NULL;

assert(stat!=NULL);
ev = stat->ev;

int len = strlen(stat->buf);
printf("write to client>>>> %s\n", stat->buf);
int _s = write(fd, stat->buf, len);
if (_s > 0)
{
printf("write successfully.\n");
}else if (_s == 0) //client:close
{
fprintf(stderr,"Remote connection[fd: %d] has been closed\n", fd);
goto ERR_EXIT;
}
else //err occurred.
{
fprintf(stderr,"read faield[fd: %d], reason:%s [%d]\n",fd , strerror(errno), _s);
goto ERR_EXIT;
}

event_set(ev, fd, EV_READ, do_echo_request, (void*)stat);
ev->ev_base = base;
event_add(ev, NULL);
return ;

ERR_EXIT:
event_free(ev);
close(fd);
free(stat);
}

typedef struct sockaddr SA;
int tcp_server_init(int port, int listen_num)
{
int errno_save;
evutil_socket_t listener;// int listener;

listener = socket(AF_INET, SOCK_STREAM, 0);
if( listener == -1 )
return -1;

//允许多次绑定同一个地址。要用在socket和bind之间
//等同于 int opt = 1;
//setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

evutil_make_listen_socket_reuseable(listener);

struct sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = 0;
sin.sin_port = htons(port);

if( bind(listener, (SA*)&sin, sizeof(sin)) < 0 )
goto error;

if( listen(listener, listen_num) < 0)
goto error;

//跨平台统一接口,将套接字设置为非阻塞状态
evutil_make_socket_nonblocking(listener);

return listener;

error:
errno_save = errno;
evutil_closesocket(listener);
errno = errno_save;

return -1;
}

例子:改进echo-使用缓冲事件

server.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>

#include <unistd.h>
#include <event.h>
#include <event2/event.h>
#include <assert.h>

#define BUFLEN 1024

typedef struct _ConnectStat {
//struct event* ev;
struct bufferevent* bev;
char buf[BUFLEN];
}ConnectStat;

//echo 服务实现相关代码
ConnectStat * stat_init(int fd, struct bufferevent *bev);


void accept_connection(int fd, short events, void* arg);
void do_echo_request(struct bufferevent* bev, void* arg);
void event_cb(struct bufferevent *bev, short event, void *arg);
//void do_echo_response(int fd, short events, void *arg);

int tcp_server_init(int port, int listen_num);

struct event_base* base;

int main(int argc, char** argv)
{

int listener = tcp_server_init(9999, 10);
if( listener == -1 )
{
perror(" tcp_server_init error ");
return -1;
}

base = event_base_new();

//添加监听客户端请求连接事件
struct event* ev_listen = event_new(base, listener, EV_READ | EV_PERSIST,
accept_connection, base);
event_add(ev_listen, NULL);


event_base_dispatch(base);

return 0;
}

ConnectStat * stat_init(int fd, struct bufferevent *bev) {
ConnectStat * temp = NULL;
temp = (ConnectStat *)malloc(sizeof(ConnectStat));

if (!temp) {
fprintf(stderr, "malloc failed. reason: %m\n");
return NULL;
}

memset(temp, '\0', sizeof(ConnectStat));
temp->bev = bev;

}

void accept_connection(int fd, short events, void* arg)
{
evutil_socket_t sockfd;

struct sockaddr_in client;
socklen_t len = sizeof(client);

sockfd = accept(fd, (struct sockaddr*)&client, &len );
evutil_make_socket_nonblocking(sockfd);

printf("accept a client %d\n", sockfd);

struct event_base* base = (struct event_base*)arg;


struct bufferevent* bev = bufferevent_socket_new(base, sockfd, BEV_OPT_CLOSE_ON_FREE);
ConnectStat *stat = stat_init(sockfd, bev);

bufferevent_setcb(bev, do_echo_request, NULL, event_cb, stat);
bufferevent_enable(bev, EV_READ | EV_PERSIST);

}


void do_echo_request(struct bufferevent* bev, void* arg)
{

ConnectStat *stat = (ConnectStat *)arg;
char *msg = stat->buf;
printf("do echo request ...\n");

size_t len = bufferevent_read(bev, msg, BUFLEN);

msg[len] = '\0';
printf("recv from client<<<< %s\n", msg);

bufferevent_write(bev, msg, strlen(msg));
}


void event_cb(struct bufferevent *bev, short event, void *arg)
{
ConnectStat *stat = (ConnectStat *)arg;

if (event & BEV_EVENT_EOF)
printf("connection closed\n");
else if (event & BEV_EVENT_ERROR)
printf("some other error\n");

//这将自动close套接字和free读写缓冲区
bufferevent_free(bev);
free(stat);
}

typedef struct sockaddr SA;
int tcp_server_init(int port, int listen_num)
{
int errno_save;
evutil_socket_t listener;// int listener;

listener = socket(AF_INET, SOCK_STREAM, 0);
if( listener == -1 )
return -1;

//允许多次绑定同一个地址。要用在socket和bind之间
//等同于 int opt = 1;
//setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

evutil_make_listen_socket_reuseable(listener);

struct sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = 0;
sin.sin_port = htons(port);

if( bind(listener, (SA*)&sin, sizeof(sin)) < 0 )
goto error;

if( listen(listener, listen_num) < 0)
goto error;


//跨平台统一接口,将套接字设置为非阻塞状态
evutil_make_socket_nonblocking(listener);

return listener;

error:
errno_save = errno;
evutil_closesocket(listener);
errno = errno_save;

return -1;
}

例子:改进echo-使用缓冲+accetp

server.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>

#include <unistd.h>
#include <event.h>
#include <event2/event.h>
#include<event2/listener.h>
#include <assert.h>

#define BUFLEN 1024

typedef struct _ConnectStat {
//struct event* ev;
struct bufferevent* bev;
char buf[BUFLEN];
}ConnectStat;

//echo 服务实现相关代码
ConnectStat * stat_init(int fd, struct bufferevent *bev);

void listener_cb(struct evconnlistener *listener, evutil_socket_t fd,
struct sockaddr *sock, int socklen, void *arg);

//accept_connection(int fd, short events, void* arg);
void do_echo_request(struct bufferevent* bev, void* arg);
void event_cb(struct bufferevent *bev, short event, void *arg);
//void do_echo_response(int fd, short events, void *arg);

int tcp_server_init(int port, int listen_num);

struct event_base* base;

int main(int argc, char** argv)
{
struct sockaddr_in sin;
memset(&sin, 0, sizeof(struct sockaddr_in));
sin.sin_family = AF_INET;
sin.sin_port = htons(9999);

base = event_base_new();

struct evconnlistener *listener
= evconnlistener_new_bind(base, listener_cb, base,
LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE,
10, (struct sockaddr*)&sin,
sizeof(struct sockaddr_in));

event_base_dispatch(base);

evconnlistener_free(listener);
event_base_free(base);

return 0;
}

ConnectStat * stat_init(int fd, struct bufferevent *bev) {
ConnectStat * temp = NULL;
temp = (ConnectStat *)malloc(sizeof(ConnectStat));

if (!temp) {
fprintf(stderr, "malloc failed. reason: %m\n");
return NULL;
}

memset(temp, '\0', sizeof(ConnectStat));
temp->bev = bev;

}

/*
一个新客户端连接上服务器此函数就会被调用,当此函数被调用时,libevent已经帮我们accept了这个客户端。
该客户端的文件描述符为fd
*/
void listener_cb(struct evconnlistener *listener, evutil_socket_t fd,
struct sockaddr *sock, int socklen, void *arg)
{
printf("accept a client %d\n", fd);

struct event_base *base = (struct event_base*)arg;

//为这个客户端分配一个bufferevent
struct bufferevent *bev = bufferevent_socket_new(base, fd,
BEV_OPT_CLOSE_ON_FREE);
ConnectStat *stat = stat_init(fd, bev);

bufferevent_setcb(bev, do_echo_request, NULL, event_cb, stat);
bufferevent_enable(bev, EV_READ | EV_PERSIST);
}


void do_echo_request(struct bufferevent* bev, void* arg)
{

ConnectStat *stat = (ConnectStat *)arg;
char *msg = stat->buf;
printf("do echo request ...\n");

size_t len = bufferevent_read(bev, msg, BUFLEN);

msg[len] = '\0';
printf("recv from client<<<< %s\n", msg);

bufferevent_write(bev, msg, strlen(msg));
}

void event_cb(struct bufferevent *bev, short event, void *arg)
{
ConnectStat *stat = (ConnectStat *)arg;

if (event & BEV_EVENT_EOF)
printf("connection closed\n");
else if (event & BEV_EVENT_ERROR)
printf("some other error\n");

//这将自动close套接字和free读写缓冲区
bufferevent_free(bev);
free(stat);
}

相关网站

官网