Python异步非阻塞IO多路复用Select/Poll/Epoll使用

来源:http://www.haiyun.me/archives/1056.html

有许多封装好的异步非阻塞IO多路复用框架,底层在linux基于最新的epoll实现,为了更好的使用,了解其底层原理还是有必要的。
下面记录下分别基于Select/Poll/Epoll的echo server实现。
Python Select Server,可监控事件数量有限制:

#!/usr/bin/python
# -*- coding: utf-8 -*-
import select
import socket</span>
import Queueserver = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setblocking(False)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR  , 1)
server_address= ('192.168.1.5',8080)
server.bind(server_address)
server.listen(10)#select轮询等待读socket集合
inputs = [server]
#select轮询等待写socket集合
outputs = []
message_queues = {}
#select超时时间
timeout = 20while True:print "等待活动连接......"readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)if not (readable or writable or exceptional) :print "select超时无活动连接,重新select...... "continue;   #循环可读事件for s in readable :#如果是server监听的socketif s is server:#同意连接connection, client_address = s.accept()print "新连接: ", client_addressconnection.setblocking(0)#将连接加入到select可读事件队列inputs.append(connection)#新建连接为key的字典,写回读取到的消息message_queues[connection] = Queue.Queue()else:#不是本机监听就是客户端发来的消息data = s.recv(1024)if data :print "收到数据:" , data , "客户端:",s.getpeername()message_queues[s].put(data)if s not in outputs:#将读取到的socket加入到可写事件队列outputs.append(s)else:#空白消息,关闭连接print "关闭连接:", client_addressif s in outputs :outputs.remove(s)inputs.remove(s)s.close()del message_queues[s]for s in writable:try:msg = message_queues[s].get_nowait()except Queue.Empty:print "连接:" , s.getpeername() , '消息队列为空'outputs.remove(s)else:print "发送数据:" , msg , "到", s.getpeername()s.send(msg)for s in exceptional:print "异常连接:", s.getpeername()inputs.remove(s)if s in outputs:outputs.remove(s)s.close()del message_queues[s]
Python Poll Server,Select升级版,无可监控事件数量限制,还是要轮询所有事件:

#!/usr/bin/python
# -*- coding: utf-8 -*-
import socket
import select
import Queueserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_address = ("192.168.1.5", 8080)
server.bind(server_address)
server.listen(5)
print  "服务器启动成功,监听IP:" , server_address
message_queues = {}
#超时,毫秒
timeout = 5000
#监听哪些事件
READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
READ_WRITE = (READ_ONLY|select.POLLOUT)
#新建轮询事件对象
poller = select.poll()
#注册本机监听socket到等待可读事件事件集合
poller.register(server,READ_ONLY)
#文件描述符到socket映射
fd_to_socket = {server.fileno():server,}
while True:print "等待活动连接......"#轮询注册的事件集合events = poller.poll(timeout)if not events:print "poll超时,无活动连接,重新poll......"continueprint "有" , len(events), "个新事件,开始处理......"for fd ,flag in events:s = fd_to_socket[fd]#可读事件if flag & (select.POLLIN | select.POLLPRI) :if s is server :#如果socket是监听的server代表有新连接connection , client_address = s.accept()print "新连接:" , client_addressconnection.setblocking(False)fd_to_socket[connection.fileno()] = connection#加入到等待读事件集合poller.register(connection,READ_ONLY)message_queues[connection]  = Queue.Queue()else :#接收客户端发送的数据data = s.recv(1024)if data:print "收到数据:" , data , "客户端:" , s.getpeername()message_queues[s].put(data)#修改读取到消息的连接到等待写事件集合poller.modify(s,READ_WRITE)else :# Close the connectionprint "  closing" , s.getpeername()# Stop listening for input on the connectionpoller.unregister(s)s.close()del message_queues[s]#连接关闭事件elif flag & select.POLLHUP :print " Closing ", s.getpeername() ,"(HUP)"poller.unregister(s)s.close()#可写事件elif flag & select.POLLOUT :try:msg = message_queues[s].get_nowait()except Queue.Empty:print s.getpeername() , " queue empty"poller.modify(s,READ_ONLY)else :print "发送数据:" , data , "客户端:" , s.getpeername()s.send(msg)#异常事件elif flag & select.POLLERR:print "  exception on" , s.getpeername()poller.unregister(s)s.close()del message_queues[s]</span>
Python Epoll Server,基于回调的事件通知模式,轻松管理大量连接:
#!/usr/bin/python
# -*- coding: utf-8 -*-
import socket, select
import Queueserversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_address = ("192.168.1.5", 8080)
serversocket.bind(server_address)
serversocket.listen(1)
print  "服务器启动成功,监听IP:" , server_address
serversocket.setblocking(0)
timeout = 10
#新建epoll事件对象,后续要监控的事件添加到其中
epoll = select.epoll()
#添加服务器监听fd到等待读事件集合
epoll.register(serversocket.fileno(), select.EPOLLIN)
message_queues = {}fd_to_socket = {serversocket.fileno():serversocket,}
while True:print "等待活动连接......"#轮询注册的事件集合events = epoll.poll(timeout)if not events:print "epoll超时无活动连接,重新轮询......"continueprint "有" , len(events), "个新事件,开始处理......"for fd, event in events:socket = fd_to_socket[fd]#可读事件if event & select.EPOLLIN:#如果活动socket为服务器所监听,有新连接if socket == serversocket:connection, address = serversocket.accept()print "新连接:" , addressconnection.setblocking(0)#注册新连接fd到待读事件集合epoll.register(connection.fileno(), select.EPOLLIN)fd_to_socket[connection.fileno()] = connectionmessage_queues[connection]  = Queue.Queue()#否则为客户端发送的数据else:data = socket.recv(1024)if data:print "收到数据:" , data , "客户端:" , socket.getpeername()message_queues[socket].put(data)#修改读取到消息的连接到等待写事件集合epoll.modify(fd, select.EPOLLOUT)#可写事件elif event & select.EPOLLOUT:try:msg = message_queues[socket].get_nowait()except Queue.Empty:print socket.getpeername() , " queue empty"epoll.modify(fd, select.EPOLLIN)else :print "发送数据:" , data , "客户端:" , socket.getpeername()socket.send(msg)#关闭事件elif event & select.EPOLLHUP:epoll.unregister(fd)fd_to_socket[fd].close()del fd_to_socket[fd]
epoll.unregister(serversocket.fileno())
epoll.close()
serversocket.close()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/497116.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

沉下来,沉下来,别让自己太浮躁

沉下来,沉下来,别让自己太浮躁路虽远,行则将至事虽难,做则必成 转载于:https://www.cnblogs.com/ttyttt/archive/2007/06/04/771071.html

Abstract Server、Adapter 和 Bridge 模式

Abstract Server、Adapter 和 Bridge 模式

Loguru:Python 日志终极解决方案

From&#xff1a;https://blog.csdn.net/kdl_csdn/article/details/121146354 1、日志的重要性 日志的作用非常重要&#xff0c;日志可以记录用户的操作、程序的异常&#xff0c;还可以为数据分析提供依据&#xff0c;日志的存在意义就是为了能够在程序在运行过程中记录错误&am…

全球机器换人排行榜!这八个国家遥遥领先

来源&#xff1a; 机器人创新生态概要&#xff1a;如今&#xff0c;世界各国都在进行机器换人&#xff0c;希望把人力劳动从低端工作岗位释放出来&#xff0c;制造业自动化水平越来越高&#xff0c;工厂利用工业机器人获得了更低的成本、更高的效率和更快的生产速度。人力成本的…

不是人问的

我居然问了一个不是人问的问题。。。。。。看来我太有问题了。。。。。。 转载于:https://www.cnblogs.com/iceice1986/archive/2007/06/12/780814.html

C++学习之路 | PTA(甲级)—— 1043 Is It a Binary Search Tree (25分)(带注释)(精简)

1043 Is It a Binary Search Tree (25分) A Binary Search Tree (BST) is recursively defined as a binary tree which has the following properties: The left subtree of a node contains only nodes with keys less than the node’s key. The right subtree of a node co…

Proxy 和 Stairway To Heaven 模式

Proxy 和 Stairway To Heaven 模式

C++学习之路 | PTA(甲级)—— 1064 Complete Binary Search Tree (30分)(带注释)(精简)

1064 Complete Binary Search Tree (30分) A Binary Search Tree (BST) is recursively defined as a binary tree which has the following properties: The left subtree of a node contains only nodes with keys less than the node’s key. The right subtree of a node c…

Python自省(反射) 与 inspect 模块

Python 自省指南&#xff1a;https://www.ibm.com/developerworks/cn/linux/l-pyint/ From&#xff1a;https://my.oschina.net/taisha/blog/55597 在计算机编程中&#xff0c;自省是指这种能力&#xff1a;检查某些事物以确定它是什么、它知道什么以及它能做什么。即 列出对…

Snap Shots 出了新东西

今天收到 Snap Shots 的邮件&#xff0c;说是他出了新产品。我看了一下&#xff0c;是两个&#xff0c;一个是 RSS 聚合&#xff0c;很好的东西&#xff0c;以后可以预览 RSS 了。另外一个是 Profile preview。感觉第二个对我的吸引大一点&#xff0c;如果可以做一个网站专门提…

视频监控成AI芯片主战场,海康威视和大华股份占据半壁江山

来源&#xff1a;MEMS概要&#xff1a;图像和视频的人工智能处理&#xff0c;是目前AI芯片商业化前景最乐观的赛道&#xff0c;也是玩家们弯道超车的最佳机会。 图像和视频的人工智能处理&#xff0c;是目前AI芯片商业化前景最乐观的赛道&#xff0c;也是玩家们弯道超车的最佳机…

需求分析设计

规格说明 1、有些雇员时钟点工。会按照他们雇员记录中每小时报酬字段的值对他们进行支付。他们每天会提交工作时间卡&#xff0c;其中记录了日期以及工作小时数。如果他们每天工作超过8小时。那么超过的部分会按照正常报酬1.5倍进行支付。每周五对他们进行支付。 2、有些雇员…

C++学习之路 | PTA(甲级)—— 1099 Build A Binary Search Tree (30分)(带注释)(精简)

1099 Build A Binary Search Tree (30分) A Binary Search Tree (BST) is recursively defined as a binary tree which has the following properties: The left subtree of a node contains only nodes with keys less than the node’s key. The right subtree of a node co…

python 命令行 解析模块 optparse、argparse

optparse&#xff1a;https://docs.python.org/zh-cn/3/library/optparse.htmlargparse &#xff1a;https://docs.python.org/zh-cn/3/library/argparse.html3.2 版后已移除 optparse 模块&#xff0c;并且将不再继续开发&#xff1b;开发转至 argparse 模块进行。 ​1、argpa…

构建插件式的应用程序框架(六)----通讯机制(ZT)

前天发了构建插件式的应用程序框架(五)&#xff0d;&#xff0d;&#xff0d;&#xff0d;管理插件这篇文章&#xff0c;有几个朋友在回复中希望了解插件之间是如何通讯的。这个系列的文章写到这里&#xff0c;也该谈谈这个问题了&#xff0c;毕竟已经有了插件管理。不知道大家…

人工智能产业2018年待解的三大难题

来源&#xff1a;人民邮电报概要&#xff1a;2017年&#xff0c;人工智能领域在算法、政策、资金等方面已经出现了三大突破&#xff0c;业界欢欣鼓舞的情形很像1999年年底网络泡沫泛滥时的情形。2017年&#xff0c;人工智能领域在算法、政策、资金等方面已经出现了三大突破&…

矩阵连乘问题(c++)

矩阵连乘问题 问题描述&#xff1a; 给定n个矩阵&#xff1a;A1,A2,…,An&#xff0c;其中Ai与Ai1是可乘的&#xff0c;i1&#xff0c;2…&#xff0c;n-1。确定计算矩阵连乘积的计算次序&#xff0c;使得依此次序计算矩阵连乘积需要的数乘次数最少。输入数据为矩阵个数和每个矩…

我在犹豫是不是该收集这几首MP3

我在犹豫是不是该收集这几首MP3&#xff0c;确实&#xff0c;曲子歌词都非常不错。可我找不到收藏的理由。总是一个想法&#xff0c;让我放弃和错过了很多东西&#xff1a;我已经错过和失去了很多&#xff0c;这一次又算什么呢&#xff1f;这样的想法让我在面对很多人或事都已经…

QuestMobile 2017年中国移动互联网年度报告

来源&#xff1a;QuestMobile2017年&#xff0c;科技的风口兜兜转转&#xff0c;从直播、VR到AI再到区块链、短视频泛娱乐IP&#xff0c;最终在2017年底定格在了知识付费上&#xff0c;然而这并没有结束&#xff0c;紧随知识付费而来的就是撒币、大撒币……这就是中国移动互联网…

Python 读写配置文件模块: configobj 和 configParser

参考&#xff1a;http://www.voidspace.org.uk/python/configobj.html Python模块之ConfigParser - 读写配置文件&#xff1a;http://www.cnblogs.com/victorwu/p/5762931.html Python 官网 configparser 文档&#xff1a;https://docs.python.org/3.7/library/configparser.…