bufferevent 与 socket

http://blog.sina.com.cn/s/blog_56dee71a0100qx4s.html

 

很多时候,除了响应事件之外,应用还希望做一定的数据缓冲。比如说,写入数据的时候,通常的运行模式是:

l 决定要向连接写入一些数据,把数据放入到缓冲区中

l 等待连接可以写入

l 写入尽量多的数据

l 记住写入了多少数据,如果还有更多数据要写入,等待连接再次可以写入

这种缓冲IO模式很通用,libevent为此提供了一种通用机制,即bufferevent。bufferevent由一个底层的传输端口(如套接字),一个读取缓冲区和一个写入缓冲区组成。与通常的事件在底层传输端口已经就绪,可以读取或者写入的时候执行回调不同的是,bufferevent在读取或者写入了足够量的数据之后调用用户提供的回调。

 

 

 

bufferevent 的简单范例

这里选取了 Libevent 的一个范例程序 hello-world.c 来看看 Libevent 的用法:

#include <string.h>
#include <errno.h>
#include <stdio.h>
#include <signal.h>
#ifndef WIN32
#include <netinet/in.h> # ifdef _XOPEN_SOURCE_EXTENDED # include <arpa/inet.h> # endif #include <sys/socket.h> #endif // bufferevent #include <event2/bufferevent.h> // bufferevent 使用的 buffer #include <event2/buffer.h> // 连接监听器 #include <event2/listener.h> #include <event2/util.h> #include <event2/event.h> static const char MESSAGE[] = "Hello, World!\n"; static const int PORT = 9995; // 新连接到来时的回调 static void listener_cb(struct evconnlistener *, evutil_socket_t, struct sockaddr *, int socklen, void *); // 读取回调 static void conn_writecb(struct bufferevent *, void *); // 事件回调 static void conn_eventcb(struct bufferevent *, short, void *); // 信号回调 static void signal_cb(evutil_socket_t, short, void *); int main(int argc, char **argv) {  struct event_base *base;  struct evconnlistener *listener;  struct event *signal_event;  struct sockaddr_in sin; #ifdef WIN32  WSADATA wsa_data;  WSAStartup(0x0201, &wsa_data); #endif  // 首先构建 base  base = event_base_new();  if (!base) {   fprintf(stderr, "Could not initialize libevent!\n");   return 1;  }  memset(&sin, 0, sizeof(sin));  sin.sin_family = AF_INET;  sin.sin_port = htons(PORT);  // 创建监听器  listener = evconnlistener_new_bind(base, listener_cb, (void *)base,  LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1,  (struct sockaddr*)&sin,  sizeof(sin));  if (!listener) {   fprintf(stderr, "Could not create a listener!\n");   return 1;  }  // 中断信号  signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base);  if (!signal_event || event_add(signal_event, NULL)<0) {   fprintf(stderr, "Could not create/add a signal event!\n");   return 1;  }  event_base_dispatch(base);  evconnlistener_free(listener);  event_free(signal_event);  event_base_free(base);  printf("done\n");  return 0; } static void listener_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sa, int socklen, void *user_data) {  struct event_base *base = user_data;  struct bufferevent *bev;  // 得到一个新的连接,通过连接 fd 构建一个 bufferevent  bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);  if (!bev) {   fprintf(stderr, "Error constructing bufferevent!");   event_base_loopbreak(base);   return;  }  // 设置创建的 bufferevent 的回调函数  bufferevent_setcb(bev, NULL, conn_writecb, conn_eventcb, NULL);  bufferevent_enable(bev, EV_WRITE);  bufferevent_disable(bev, EV_READ);  // 写入数据到 bufferevent 中  bufferevent_write(bev, MESSAGE, strlen(MESSAGE)); } static void conn_writecb(struct bufferevent *bev, void *user_data) {  struct evbuffer *output = bufferevent_get_output(bev);  if (evbuffer_get_length(output) == 0) {   printf("flushed answer\n");   bufferevent_free(bev);  } } static void conn_eventcb(struct bufferevent *bev, short events, void *user_data) {  if (events & BEV_EVENT_EOF) {   printf("Connection closed.\n");  } else if (events & BEV_EVENT_ERROR) {   printf("Got an error on the connection: %s\n",   strerror(errno));/*XXX win32*/  }  /* None of the other events can happen here, since we haven't enabled  * timeouts */  bufferevent_free(bev); } static void signal_cb(evutil_socket_t sig, short events, void *user_data) {  struct event_base *base = user_data;  struct timeval delay = { 2, 0 };  printf("Caught an interrupt signal; exiting cleanly in two seconds.\n");  // 停止事件循环  event_base_loopexit(base, &delay); }

研究 bufferevent 的关键代码

这里只研究基于 socket 的 bufferevent。从上面 bufferevent 的使用可以看出,有几个关键函数:

  1. 开始需要调用 bufferevent_socket_new 创建一个 bufferevent
  2. 调用 bufferevent_setcb 设置回调函数
  3. 调用 bufferevent_write 写入数据
  4. 调用 bufferevent_free 释放 bufferevent

bufferevent_socket_new 的源码以及分析如下:

// base --- 新创建的 bufferevent 关联的 base
// fd --- bufferevent 关联的文件描述符
struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,int options) {  // bufferevent_private 结构体持有 bufferevent 的数据  struct bufferevent_private *bufev_p;  // bufev == &(bufev_p->bev);  // struct bufferevent 中存放的是不同类型的 bufferevent 所共有的部分  // struct bufferevent 是 struct bufferevent_private 的子集  struct bufferevent *bufev;  // windows 下如果启用 IOCP 则构建异步 IO bufferevent #ifdef WIN32  if (base && event_base_get_iocp(base))   // 细节略   return bufferevent_async_new(base, fd, options); #endif  if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)   return NULL;  // 初始化 bufferevent_private  // 由于 bufferevent 有不同类型,所以这里设计了 bufferevent_ops_socket  // 对于不同类型的 bufferevent 有不同的 bufferevent_ops_socket 对象  // bufferevent_ops_socket 包括函数指针和一些信息  if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket,     options) < 0) {   mm_free(bufev_p);   return NULL;  }  bufev = &bufev_p->bev;  // 设置 EVBUFFER_FLAG_DRAINS_TO_FD,此选项和 evbuffer_add_file() 函数有关(详见文档)  evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);  // 初始化 read 和 write event  // 一个 bufferevent(一个 fd)关联两个 event 对象 ev_read 和 ev_write  // ev_read --- socket 可读或者超时  // ev_write --- socket 可写或者超时  // 它们都未使用 Edge triggered 方式  event_assign(&bufev->ev_read, bufev->ev_base, fd,  EV_READ|EV_PERSIST, bufferevent_readcb, bufev);  event_assign(&bufev->ev_write, bufev->ev_base, fd,  EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);  // 为输出缓冲区设置回调  // 当输出缓冲区被修改时调用 bufferevent_socket_outbuf_cb 回调函数  evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);  // 防止输入缓冲区和输出缓冲区被意外修改  evbuffer_freeze(bufev->input, 0);  evbuffer_freeze(bufev->output, 1);  return bufev; }

其中 bufferevent_init_common 函数实现为:

int
bufferevent_init_common(struct bufferevent_private *bufev_private,struct event_base *base,const struct bufferevent_ops *ops, enum bufferevent_options options) {  struct bufferevent *bufev = &bufev_private->bev;  // 创建输入缓冲区  if (!bufev->input) {   if ((bufev->input = evbuffer_new()) == NULL)    return -1;  }  // 创建输出缓冲区  if (!bufev->output) {   if ((bufev->output = evbuffer_new()) == NULL) {    evbuffer_free(bufev->input);    return -1;   }  }  // 初始化 bufferevent 的引用计数  bufev_private->refcnt = 1;  bufev->ev_base = base;  /* Disable timeouts. */  // 清理超时时间  evutil_timerclear(&bufev->timeout_read);  evutil_timerclear(&bufev->timeout_write);  bufev->be_ops = ops;  /*  * Set to EV_WRITE so that using bufferevent_write is going to  * trigger a callback. Reading needs to be explicitly enabled  * because otherwise no data will be available.  */  // enabled 被 bufferevent_get_enabled 函数返回  // enabled 的值可以为 EV_WRITE EV_READ  bufev->enabled = EV_WRITE;  // bufferevent 相关线程初始化 #ifndef _EVENT_DISABLE_THREAD_SUPPORT  if (options & BEV_OPT_THREADSAFE) {   if (bufferevent_enable_locking(bufev, NULL) < 0) {    /* cleanup */    evbuffer_free(bufev->input);    evbuffer_free(bufev->output);    bufev->input = NULL;    bufev->output = NULL;    return -1;   }  } #endif  // 选项正确性检查  if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS))  == BEV_OPT_UNLOCK_CALLBACKS) {   event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");   return -1;  }  // defer callbacks 初始化  if (options & BEV_OPT_DEFER_CALLBACKS) {   if (options & BEV_OPT_UNLOCK_CALLBACKS)    event_deferred_cb_init(&bufev_private->deferred,    bufferevent_run_deferred_callbacks_unlocked,    bufev_private);   else    event_deferred_cb_init(&bufev_private->deferred,    bufferevent_run_deferred_callbacks_locked,    bufev_private);  }  bufev_private->options = options;  // 关联 bufferevent 和 buffer  evbuffer_set_parent(bufev->input, bufev);  evbuffer_set_parent(bufev->output, bufev);  return 0; }

bufferevent 创建成功之后,fd 上存在数据可读则调用 bufferevent_readcb

// fd 可读
static void
bufferevent_readcb(evutil_socket_t fd, short event, void *arg) {  struct bufferevent *bufev = arg;  struct bufferevent_private *bufev_p =  EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);  struct evbuffer *input;  int res = 0;  short what = BEV_EVENT_READING;  ev_ssize_t howmuch = -1, readmax=-1;  _bufferevent_incref_and_lock(bufev);  // 如果超时了  if (event == EV_TIMEOUT) {   /* Note that we only check for event==EV_TIMEOUT. If   * event==EV_TIMEOUT|EV_READ, we can safely ignore the   * timeout, since a read has occurred */   what |= BEV_EVENT_TIMEOUT;   goto error;  }  input = bufev->input;  /*  * If we have a high watermark configured then we don't want to  * read more data than would make us reach the watermark.  */  // 是否设置了输入缓冲区的最大大小  if (bufev->wm_read.high != 0) {   howmuch = bufev->wm_read.high - evbuffer_get_length(input);   /* we somehow lowered the watermark, stop reading */   // 缓冲区中数据过多   if (howmuch <= 0) {    // 暂停 bufferevent 的数据读取    // 具体的做法是移除 read event(ev_read)    bufferevent_wm_suspend_read(bufev);    goto done;   }  }  // 获取可读最大大小  // 和限速有关,如果不限速,则为 MAX_TO_READ_EVER(16384) 也就是 16K  readmax = _bufferevent_get_read_max(bufev_p);  if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited"      * uglifies this code. XXXX */   howmuch = readmax;  // 如果读取暂停  if (bufev_p->read_suspended)   goto done;  // 输入缓冲区可读  evbuffer_unfreeze(input, 0);  // 读取 fd 上的数据  res = evbuffer_read(input, fd, (int)howmuch); /* XXXX evbuffer_read would do better to take and return ev_ssize_t */  // 输入缓冲区禁止读取  evbuffer_freeze(input, 0);  // 读取数据失败  if (res == -1) {   // 获取到错误   int err = evutil_socket_geterror(fd);   // EINTR、EAGAIN   // Windows 下为 WSAEWOULDBLOCK、WSAEINTR   if (EVUTIL_ERR_RW_RETRIABLE(err))    goto reschedule;   // 如果错误是不可重试的,报错   /* error case */   what |= BEV_EVENT_ERROR;  // eof  } else if (res == 0) {   /* eof case */   what |= BEV_EVENT_EOF;  }  if (res <= 0)   goto error;  _bufferevent_decrement_read_buckets(bufev_p, res);  /* Invoke the user callback - must always be called last */  // 判断是否需要调用回调  if (evbuffer_get_length(input) >= bufev->wm_read.low)   _bufferevent_run_readcb(bufev);  goto done; reschedule:  goto done; error:  // 出错后暂停读取数据  bufferevent_disable(bufev, EV_READ);  // 通过事件回调通知出错  _bufferevent_run_eventcb(bufev, what); done:  _bufferevent_decref_and_unlock(bufev); }

这里比较关键的函数为 evbuffer_read:

int
evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
{
  struct evbuffer_chain **chainp;
 int n;  int result; #ifdef USE_IOVEC_IMPL  int nvecs, i, remaining; #else  struct evbuffer_chain *chain;  unsigned char *p; #endif  EVBUFFER_LOCK(buf);  // buffer 是否可读  if (buf->freeze_end) {   result = -1;   goto done;  }  // 获取当前 socket 可读的字节数  n = get_n_bytes_readable_on_socket(fd);  if (n <= 0 || n > EVBUFFER_MAX_READ)   n = EVBUFFER_MAX_READ;  // 尽可能多的读取  if (howmuch < 0 || howmuch > n)   howmuch = n;  // 一种实现 #ifdef USE_IOVEC_IMPL  /* Since we can use iovecs, we're willing to use the last  * NUM_READ_IOVEC chains. */  // 调整缓冲区(细节略)  if (_evbuffer_expand_fast(buf, howmuch, NUM_READ_IOVEC) == -1) {   result = -1;   goto done;  } else {   IOV_TYPE vecs[NUM_READ_IOVEC]; #ifdef _EVBUFFER_IOVEC_IS_NATIVE   nvecs = _evbuffer_read_setup_vecs(buf, howmuch, vecs,   NUM_READ_IOVEC, &chainp, 1); #else   /* We aren't using the native struct iovec. Therefore,   we are on win32. */   struct evbuffer_iovec ev_vecs[NUM_READ_IOVEC];   nvecs = _evbuffer_read_setup_vecs(buf, howmuch, ev_vecs, 2,   &chainp, 1);   for (i=0; i < nvecs; ++i)    WSABUF_FROM_EVBUFFER_IOV(&vecs[i], &ev_vecs[i]); #endif #ifdef WIN32   {    // 读取到的数据字节数    DWORD bytesRead;    DWORD flags=0;    // windows 下进行 recv    if (WSARecv(fd, vecs, nvecs, &bytesRead, &flags, NULL, NULL)) {     /* The read failed. It might be a close,     * or it might be an error. */     // 这里使用 WSARecv 时需要注意 WSAECONNABORTED 表示了连接关闭了     if (WSAGetLastError() == WSAECONNABORTED)      n = 0;     else      n = -1;    } else     n = bytesRead;   } #else   // 非 windows 平台 read   n = readv(fd, vecs, nvecs); #endif  }  // 使用另外一种实现 #else /*!USE_IOVEC_IMPL*/  /* If we don't have FIONREAD, we might waste some space here */  /* XXX we _will_ waste some space here if there is any space left  * over on buf->last. */  if ((chain = evbuffer_expand_singlechain(buf, howmuch)) == NULL) {   result = -1;   goto done;  }  /* We can append new data at this point */  p = chain->buffer + chain->misalign + chain->off;  // read #ifndef WIN32  n = read(fd, p, howmuch); #else  n = recv(fd, p, howmuch, 0); #endif #endif /* USE_IOVEC_IMPL */  if (n == -1) {   result = -1;   goto done;  }  if (n == 0) {   result = 0;   goto done;  } #ifdef USE_IOVEC_IMPL  remaining = n;  for (i=0; i < nvecs; ++i) {   ev_ssize_t space = (ev_ssize_t) CHAIN_SPACE_LEN(*chainp);   if (space < remaining) {    (*chainp)->off += space;   

转载于:https://www.cnblogs.com/diegodu/p/4779114.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/374538.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Codeforces Round #102 (Div. 1) A. Help Farmer 暴力分解

A. Help Farmer题目连接&#xff1a; http://www.codeforces.com/contest/142/problem/A Description Once upon a time in the Kingdom of Far Far Away lived Sam the Farmer. Sam had a cow named Dawn and he was deeply attached to her. Sam would spend the whole summe…

电力电子、电机控制系统的建模和仿真_清华团队研发,首款国产电力电子仿真软件来啦~已捐赠哈工大、海工大、清华使用!...

点击上方电气小青年&#xff0c;关注并星标由于微信改版&#xff0c;只有星标才能及时看到我们的消息哦━━━━━━推荐阅读&#xff1a;《膜拜大神&#xff01;清华大学电机系2021年接收推荐免试直硕(博)生拟录取名单公示&#xff01;》《滴滴程序员年薪80万被鄙视不如在二本…

JVM如何处理锁

当我们谈论最新版本的Sun Hotspot Java虚拟机1.6时&#xff0c;当您尝试从java.util.concurrent.locks.Lock实现获取锁或输入同步块时&#xff0c;JVM将执行以下三种锁类型&#xff1a; 有偏见的 &#xff1a;有时即使在并发系统中也没有争用&#xff0c;并且在这种情况下&…

基于node.js及express实现中间件,实现post、get

首先&#xff0c;当然是有必要的环境&#xff0c;安装node&#xff0c;这个我就不多说了。 依赖模块&#xff1a; "express": "^4.13.4", "request": "^2.72.0", "body-parser": "^1.13.3",页面 $.ajax({type: &q…

可视化分析之图表选择

转载于:https://www.cnblogs.com/yymn/p/4783631.html

定义并调用函数输出 fibonacci 序列_科学网—Zmn-0351 薛问天:再谈数学概念的定义,评新华先生《0345》...

Zmn-0351 薛问天&#xff1a;再谈数学概念的定义&#xff0c;评新华先生《0345》【编者按。下面是薛问天先生发来的文章。是对《Zmn-0345》新华先生文章的评论。现在发布如下&#xff0c;供网友们共享。请大家关注并积极评论。另外本《专栏》重申&#xff0c;这里纯属学术讨论&…

Java和内存泄漏

总览 术语“内存泄漏”在Java中的使用方式不同于在其他语言中使用的方式。 通用术语中的“内存泄漏”是什么意思&#xff0c;在Java中如何使用&#xff1f; 维基百科的定义 当计算机程序消耗内存但无法将其释放回操作系统时&#xff0c;就会发生计算机科学中的内存泄漏&#x…

453. 最小操作次数使数组元素相等

给你一个长度为 n 的整数数组&#xff0c;每次操作将会使 n - 1 个元素增加 1 。返回让数组所有元素相等的最小操作次数。 class Solution {public int minMoves(int[] nums) {int res 0;int sum 0;int n nums.length;for(int i 0;i<n;i){sum nums[i];}res sum - min…

第二章 TCP/IP 基础知识

第二章 TCP/IP 基础知识 TCP/IP transmission control protocol and ip internet protocol 是互联网众多通信协议中最为著名的。 2.2 TCP/IP 的标准化 2.2.2 TCP/IP 标准化精髓 TCP/IP 协议始终具有很强的实用性。 相比于TCP/IP &#xff0c;OSI 之所以未能达到普及&#xff0…

CSS太阳月亮地球三角恋旋转效果

纯粹玩一下&#xff0c;好像没有什么实际的卵用&#xff0c;but&#xff0c;纯玩买不了上当&#xff0c;纯玩买不了受骗。。。。。。。。 地月旋转的一个css效果&#xff0c;无聊玩玩&#xff0c;可以复制到记事本试试 <!DOCTYPE html><html lang"en">&l…

gorm preload 搜索_LeetCode刷题笔记|95:不同的二叉搜索树 II

题目描述给定一个整数 n&#xff0c;生成所有由 1 ... n 为节点所组成的 二叉搜索树 。示例输入&#xff1a;3输出&#xff1a;[[1,null,3,2],[3,2,null,1],[3,1,null,null,2],[2,1,3],[1,null,2,null,3]]解释&#xff1a;以上的输出对应以下 5 种不同结构的二叉搜索树&#xf…

Java初学者指南

Java编程的第一步。 对于Java中的入门教程&#xff0c;请参阅Sun的官方帮助这里 除了核心语言外&#xff0c;还有几种技术和API 介绍。 我们建议首先阅读涵盖 基础知识&#xff0c;并继续其余的教程。 我们建议&#xff1a; 保持代码简单易读 拆分逻辑组件&#xff08;类…

Javascript中Promise对象的实现

http://segmentfault.com/a/1190000000684654 http://www.infoq.com/cn/news/2011/09/js-promise/转载于:https://www.cnblogs.com/zuiyirenjian/p/4787864.html

字符串分割与存入List集合

List<string> namelist new List<string>(); string[] namejh null; string name "张三李四王五"; 第一步&#xff1a;将三个名字分离出来 namejh name.Split("".ToCharArray(), StringSplitOptions.RemoveEmptyEntries); namelist new Li…

GTJ2018如何导出全部工程量_如何成为优秀的造价员?广联达编制内刊手册,造价员算量高手秘籍...

如何成为优秀的造价员&#xff1f;广联达编制内刊手册&#xff0c;造价员算量高手秘籍[高手秘籍]是广联达课程编制委员会暨直播委员会精心打造的&#xff0c;能够“让您深入理解软件计算、设置等原理,遇到问题有处理思路,以常见问题为导向&#xff0c;重点进行原因分析&#xf…

带有Spring,Hibernate,Akka,Twitter Bootstrap,Apache Tiles和jQuery的Maven Web项目Kickstarter代码库...

我很高兴将第二个项目上传到GitHub&#xff0c;以帮助人们尽快开始Java Web App开发。 我正在与Apache License 2.0共享此代码。 这是相同的网址&#xff1a; https://github.com/ykameshrao/spring-hibernate-springdata-springmvc-maven-project-framework 该项目包括以下部…

git项目添加.gitigore文件

以前一直没有注意这个文件&#xff0c;最近读到了黄勇的《架构探险》&#xff0c;觉得这个文件还是很有用的。 .gitigore文件可以自己配置。 我使用的是书中所用的配置&#xff0c;简洁明了。 # Maven # target/#log# logs/# IDEA # .idea/ *.iml# Eclipse # .settings/ .metad…

463. 岛屿的周长

给定一个 row x col 的二维网格地图 grid &#xff0c;其中&#xff1a;grid[i][j] 1 表示陆地&#xff0c; grid[i][j] 0 表示水域。 网格中的格子 水平和垂直 方向相连&#xff08;对角线方向不相连&#xff09;。整个网格被水完全包围&#xff0c;但其中恰好有一个岛屿&a…

C++服务器设计(七):聊天系统服务端实现

在之前的章节中&#xff0c;我们对服务端系统的设计实现原理进行了剖析&#xff0c;在这一章中&#xff0c;我们将对服务端框架进行实际运用&#xff0c;实现一款运行于内网环境的聊天系统。该聊天系统由客户端与服务器两部分组成&#xff0c;同时服务端通过数据库维护用户的账…

高校实验室管理系统_史上最全面的实验室信息管理系统(LIMS)全解

1. LIMS的基本概念和发展状况1.1 概括LIMS实验室管理系统是为实验、检测等业务板块提供流程化、模块化、标准化操作管理系统&#xff0c;打造基于行业法规的实验室全流程质量控制管理系统&#xff0c;实现实验室“人、机、料、法、环”关键环节管理。1.2 发展状况随着科研规范化…