3. 网络并发模型
3.1 网络并发模型概述
-
什么是网络并发
在实际工作中,一个服务端程序往往要应对多个客户端同时发起访问的情况。如果让服务端程序能够更好的同时满足更多客户端网络请求的情形,这就是并发网络模型。
-
循环网络模型问题
循环网络模型只能循环接收客户端请求,处理请求。同一时刻只能处理一个请求,处理完毕后再处理下一个。这样的网络模型虽然简单,资源占用不多,但是无法同时处理多个客户端请求就是其最大的弊端,往往只有在一些低频的小请求任务中才会使用。
3.2 多进程/线程并发模型
多进程/线程并发模中每当一个客户端连接服务器,就创建一个新的进程/线程为该客户端服务,客户端退出时再销毁该进程/线程,多任务并发模型也是实际工作中最为常用的服务端处理模型。
-
模型特点
- 优点:能同时满足多个客户端长期占有服务端需求,可以处理各种请求。
- 缺点: 资源消耗较大
- 适用情况:客户端请求较复杂,需要长时间占有服务器。
-
创建流程
- 创建网络套接字
- 等待客户端连接
- 有客户端连接,则创建新的进程/线程具体处理客户端请求
- 主进程/线程继续等待处理其他客户端连接
- 如果客户端退出,则销毁对应的进程/线程
多进程并发模型示例:"""
基于多进程的网络并发模型
重点代码 !!创建tcp套接字
等待客户端连接
有客户端连接,则创建新的进程具体处理客户端请求
父进程继续等待处理其他客户端连接
如果客户端退出,则销毁对应的进程
"""
from socket import *
from multiprocessing import Process
import sys# 地址变量
HOST = "0.0.0.0"
PORT = 8888
ADDR = (HOST, PORT)# 处理客户端具体请求
def handle(connfd):while True:data = connfd.recv(1024)if not data:breakprint(data.decode())connfd.close()# 服务入口函数
def main():# 创建tcp套接字tcp_socket = socket()tcp_socket.bind(ADDR)tcp_socket.listen(5)print("Listen the port %d"%PORT)# 循环连接客户端while True:try:connfd, addr = tcp_socket.accept()print("Connect from", addr)except KeyboardInterrupt:tcp_socket.close()sys.exit("服务结束")# 创建进程 处理客户端请求p = Process(target=handle, args=(connfd,),daemon=True)p.start()if __name__ == '__main__':main()
多线程并发模型示例:
"""
基于多线程的网络并发模型
重点代码 !!思路: 网络构建 线程搭建 / 具体处理请求
"""
from socket import *
from threading import Thread# 处理客户端具体请求
class Handle:# 具体处理请求函数 (逻辑处理,数据处理)def request(self, data):print(data)# 创建线程得到请求
class ThreadServer(Thread):def __init__(self, connfd):self.connfd = connfdself.handle = Handle()super().__init__(daemon=True)# 接收客户端的请求def run(self):while True:data = self.connfd.recv(1024).decode()if not data:breakself.handle.request(data)self.connfd.close()# 网络搭建
class ConcurrentServer:"""提供网络功能"""def __init__(self, *, host="", port=0):self.host = hostself.port = portself.address = (host, port)self.sock = self.__create_socket()def __create_socket(self):tcp_socket = socket()tcp_socket.bind(self.address)return tcp_socket# 启动服务 --> 准备连接客户端def serve_forever(self):self.sock.listen(5)print("Listen the port %d" % self.port)while True:connfd, addr = self.sock.accept()print("Connect from", addr)# 创建线程t = ThreadServer(connfd)t.start()if __name__ == '__main__':server = ConcurrentServer(host="0.0.0.0", port=8888)server.serve_forever() # 启动服务
ftp 文件服务器
【1】 分为服务端和客户端,要求可以有多个客户端同时操作。
【2】 客户端可以查看服务器文件库中有什么文件。
【3】 客户端可以从文件库中下载文件到本地。
【4】 客户端可以上传一个本地文件到文件库。
【5】 使用print在客户端打印命令输入提示,引导操作
参考代码:######################### 服务端 ############################
from socket import *
from threading import Thread
import os
from time import sleep# 文件库
FTP = "/home/tarena/FTP/"# 处理客户端具体请求
class Handle:def __init__(self, connfd):self.connfd = connfddef do_list(self):filelist = os.listdir(FTP)if filelist:self.connfd.send(b"OK")sleep(0.1)# 发送文件列表files = "\n".join(filelist)self.connfd.send(files.encode())else:self.connfd.send(b"FAIL")def do_get(self, filename):try:file = open(FTP + filename, 'rb')except:self.connfd.send(b"FAIL")else:self.connfd.send(b"OK")sleep(0.1)# 发送文件while True:data = file.read(1024)if not data:breakself.connfd.send(data)file.close()sleep(0.1)self.connfd.send(b"##")def do_put(self, filename):# 判断文件是否存在if os.path.exists(FTP + filename):self.connfd.send(b"FAIL")else:self.connfd.send(b"OK")# 接收文件file = open(FTP + filename, 'wb')while True:data = self.connfd.recv(1024)if data == b"##":breakfile.write(data)file.close()def request(self):while True:data = self.connfd.recv(1024).decode()# 分情况具体处理请求函数tmp = data.split(' ')if not data or tmp[0] == "EXIT":breakelif tmp[0] == "LIST":self.do_list()elif tmp[0] == "GET":# tmp-> [GET,filename]self.do_get(tmp[1])elif tmp[0] == "PUT":self.do_put(tmp[1])# 创建线程得到请求
class FTPThread(Thread):def __init__(self, connfd):self.connfd = connfdself.handle = Handle(connfd)super().__init__(daemon=True)# 接收客户端的请求def run(self):self.handle.request()self.connfd.close()# 网络搭建
class ConcurrentServer:"""提供网络功能"""def __init__(self, *, host="", port=0):self.host = hostself.port = portself.address = (host, port)self.sock = self.__create_socket()def __create_socket(self):tcp_socket = socket()tcp_socket.bind(self.address)return tcp_socket# 启动服务 --> 准备连接客户端def serve_forever(self):self.sock.listen(5)print("Listen the port %d" % self.port)while True:connfd, addr = self.sock.accept()print("Connect from", addr)# 创建线程t = FTPThread(connfd)t.start()if __name__ == '__main__':server = ConcurrentServer(host="0.0.0.0", port=8880)server.serve_forever() # 启动服务########################### 客户端 ###############################"""
文件服务器客户端
"""
from socket import *
import sys
from time import sleep# 具体发起请求,逻辑处理
class Handle:def __init__(self):self.server_address = ("127.0.0.1", 8880)self.sock = self.__connect_server()def __connect_server(self):tcp_socket = socket()tcp_socket.connect(self.server_address)return tcp_socketdef do_list(self):self.sock.send(b"LIST") # 发送请求response = self.sock.recv(1024) # 接收响应if response == b"OK":# 接收文件列表 file1\nfile2\n..files = self.sock.recv(1024 * 1024)print(files.decode())else:print("获取文件列表失败")def do_exit(self):self.sock.send(b"EXIT")self.sock.close()sys.exit("谢谢使用")def do_get(self, filename):request = "GET " + filenameself.sock.send(request.encode()) # 发送请求response = self.sock.recv(128) # 接收响应if response == b"OK":file = open(filename, 'wb')# 接收文件内容,写入文件while True:data = self.sock.recv(1024)if data == b"##":breakfile.write(data)file.close()else:print("该文件不存在")def do_put(self, filename):try:file = open(filename, 'rb')except:print("该文件不存在")else:filename = filename.split("/")[-1] # 获取文件名request = "PUT " + filenameself.sock.send(request.encode())response = self.sock.recv(128)if response == b"OK":# 发送文件while True:data = file.read(1024)if not data:breakself.sock.send(data)file.close()sleep(0.1)self.sock.send(b"##")else:print("上传失败")# 图形交互类
class FTPView:def __init__(self):self.__handle = Handle()def __display_menu(self):print()print("1. 查看文件")print("2. 下载文件")print("3. 上传文件")print("4. 退 出")print()def __select_menu(self):item = input("请输入选项:")if item == "1":self.__handle.do_list()elif item == "2":filename = input("要下载的文件:")self.__handle.do_get(filename)elif item == "3":filename = input("要上传的文件:")self.__handle.do_put(filename)elif item == "4":self.__handle.do_exit()else:print("请输入正确选项!")def main(self):while True:self.__display_menu()self.__select_menu()if __name__ == '__main__':ftp = FTPView()ftp.main() # 启动
3.3 IO并发模型
3.3.1 IO概述
-
什么是IO
在程序中存在读写数据操作行为的事件均是IO行为,比如终端输入输出 ,文件读写,数据库修改和网络消息收发等。
-
程序分类
- IO密集型程序:在程序执行中有大量IO操作,而运算操作较少。消耗cpu较少,耗时长。
- 计算密集型程序:程序运行中运算较多,IO操作相对较少。cpu消耗多,执行速度快,几乎没有阻塞。
-
IO分类:阻塞IO ,非阻塞IO,IO多路复用等。
3.3.2 阻塞IO
- 定义:在执行IO操作时如果执行条件不满足则阻塞。阻塞IO是IO的默认形态。
- 效率:阻塞IO效率很低。但是由于逻辑简单所以是默认IO行为。
- 阻塞情况
- 因为某种执行条件没有满足造成的函数阻塞
e.g. accept input recv - 处理IO的时间较长产生的阻塞状态
e.g. 网络传输,大文件读写
- 因为某种执行条件没有满足造成的函数阻塞
3.3.3 非阻塞IO
- 定义 :通过修改IO属性行为,使原本阻塞的IO变为非阻塞的状态。
-
设置套接字为非阻塞IO
sockfd.setblocking(bool) 功能:设置套接字为非阻塞IO 参数:默认为True,表示套接字IO阻塞;设置为False则套接字IO变为非阻塞
-
超时检测 :设置一个最长阻塞时间,超过该时间后则不再阻塞等待。
sockfd.settimeout(sec) 功能:设置套接字的超时时间 参数:设置的时间
非阻塞IO示例: """ 设置非阻塞的套接字 """ from socket import * from time import sleep, ctime# 日志文件模拟与网络无关IO file = open("my.log", "a")# 创建tcp套接字 sock = socket() sock.bind(("127.0.0.1", 8888)) sock.listen(5)# 设置为非阻塞 # sock.setblocking(False)# 设置超时事件 sock.settimeout(3)# 循环处理客户端连接 while True:try:connfd, addr = sock.accept()print("Connect from", addr)except timeout as e:# 模拟一个与accept 无关的事件msg = "%s : %s\n" % (ctime(), e)file.write(msg)except BlockingIOError as e:# 模拟一个与accept 无关的事件msg = "%s : %s\n" % (ctime(), e)file.write(msg)sleep(2)else:# accept 正常执行data = connfd.recv(1024)print(data.decode())
3.3.4 IO多路复用
-
定义
同时监控多个IO事件,当哪个IO事件准备就绪就执行哪个IO事件。以此形成可以同时处理多个IO的行为,避免一个IO阻塞造成其他IO均无法执行,提高了IO执行效率。
-
具体方案
- select方法 : Windows Linux Unix
- epoll方法: Linux
-
select 方法
rs, ws, xs=select(rlist, wlist, xlist[, timeout])
功能: 监控IO事件,阻塞等待IO发生
参数: rlist 列表 读IO列表,添加等待发生的或者可读的IO事件wlist 列表 写IO列表,存放要可以主动处理的或者可写的IO事件xlist 列表 异常IO列表,存放出现异常要处理的IO事件timeout 超时时间返回值: rs 列表 rlist中准备就绪的IOws 列表 wlist中准备就绪的IOxs 列表 xlist中准备就绪的IO
select 方法示例:
"""
IO多路复用 基础演示 select
"""
from select import select
from socket import *# 创建几个IO对象
tcp_sock = socket()
tcp_sock.bind(("0.0.0.0",8888))
tcp_sock.listen(5)file = open("my.log",'r')udp_sock = socket(AF_INET,SOCK_DGRAM)print("开始监控IO")
rs,ws,xs = select([file,udp_sock],[file,udp_sock],[])
print("rlist:",rs)
print("wlist:",ws)
print("xlist:",xs)
- epoll方法
ep = select.epoll()
功能 : 创建epoll对象
返回值: epoll对象
ep.register(fd,event)
功能: 注册关注的IO事件
参数:fd 要关注的IOevent 要关注的IO事件类型常用类型EPOLLIN 读IO事件(rlist)EPOLLOUT 写IO事件 (wlist)EPOLLERR 异常IO (xlist)e.g. ep.register(sockfd,EPOLLIN|EPOLLERR)ep.unregister(fd)
功能:取消对IO的关注
参数:IO对象或者IO对象的fileno
events = ep.poll()
功能: 阻塞等待监控的IO事件发生
返回值: 返回发生的IOevents格式 [(fileno,event),()....]每个元组为一个就绪IO,元组第一项是该IO的fileno,第二项为该IO就绪的事件类型
epoll方法示例:
"""
IO多路复用 基础演示 epoll
"""
from select import *
from socket import *# 创建几个IO对象
tcp_sock = socket()
tcp_sock.bind(("0.0.0.0",8888))
tcp_sock.listen(5)file = open("my.log",'r+')udp_sock = socket(AF_INET,SOCK_DGRAM)# 创建epoll对象
ep = epoll()# 关注IO对象
ep.register(tcp_sock,EPOLLIN)
ep.register(udp_sock,EPOLLOUT|EPOLLERR)# 建立查找字典
map = {tcp_sock.fileno():tcp_sock,udp_sock.fileno():udp_sock,
}print("开始监控IO")
events = ep.poll()
print(events) # 就绪的IO# 不再关注
ep.unregister(udp_sock)
del map[udp_sock.fileno()]
- select 方法与epoll方法对比
- epoll 效率比select要高
- epoll 同时监控IO数量比select要多
- epoll 支持EPOLLET触发方式
3.3.5 IO并发模型
利用IO多路复用等技术,同时处理多个客户端IO请求。
-
优点 : 资源消耗少,能同时高效处理多个IO行为
-
缺点 : 只针对处理并发产生的IO事件
-
适用情况:HTTP请求,网络传输等都是IO行为,可以通过IO多路复用监控多个客户端的IO请求。
-
网络并发服务实现过程
【1】将套接字对象设置为关注的IO,通常设置为非阻塞状态。
【2】通过IO多路复用方法提交,进行IO监控。
【3】阻塞等待,当监控的IO有事件发生时结束阻塞。
【4】遍历返回值列表,确定就绪的IO事件类型。
【5】处理发生的IO事件。
【6】继续循环监控IO发生。
IO多路复用并发模型################################# select 方法 ####################################
"""
基于select的并发服务模型
使用函数完成
"""
from select import select
from socket import *# 服务器地址
HOST = "0.0.0.0"
PORT = 8888
ADDR = (HOST,PORT)# 监控列表
rlist = []
wlist = []
xlist = []# 处理客户端连接
def connect_client(sock):connfd, addr = sock.accept()print("Connect from", addr)connfd.setblocking(False)rlist.append(connfd) # 增加关注对象# 处理客户端消息
def handle_client(connfd):data = connfd.recv(1024)# 处理客户端退出if not data:rlist.remove(connfd) # 不再关注connfd.close()returnprint(data.decode())connfd.send(b"Thanks")def main():# 创建监听套接字sock = socket()sock.bind(ADDR)sock.listen(3)# 配合非阻塞IO防止网络中断带来的内部阻塞sock.setblocking(False)rlist.append(sock) # 初始监控的IO对象# 循环监控关注的IO发生while True:rs,ws,xs = select(rlist,wlist,xlist)for r in rs:if r is sock:connect_client(r) # 连接客户端else:handle_client(r) # 处理客户端消息if __name__ == '__main__':main()################################ epoll 方法 ################################
"""
基于epoll的并发服务模型
使用类实现
"""
from select import *
from socket import *class EpollServer:def __init__(self, host="", port=0):self.host = hostself.port = portself.address = (host, port)self.sock = self._create_socket()self.ep = epoll()self.map = {} # 查找字典def _create_socket(self):sock = socket()sock.bind(self.address)sock.setblocking(False)return sock# 处理客户端连接def _connect_client(self, fd):connfd, addr = self.map[fd].accept()print("Connect from", addr)connfd.setblocking(False)# 增加关注对象,设置边缘触发self.ep.register(connfd, EPOLLIN | EPOLLET)self.map[connfd.fileno()] = connfd # 维护字典# 处理客户端消息def _handle_client(self, fd):data = self.map[fd].recv(1024)# 处理客户端退出if not data:self.ep.unregister(fd) # 不再关注self.map[fd].close()del self.map[fd] # 从字典删除returnprint(data.decode())self.map[fd].send(b"Thanks")# 启动服务def serve_forever(self):self.sock.listen(3)print("Listen the port %d" % self.port)self.ep.register(self.sock, EPOLLIN) # 设置关注self.map[self.sock.fileno()] = self.sockwhile True:events = self.ep.poll()# 循环查看哪个IO发生就处理哪个for fd, event in events:if fd == self.sock.fileno():self._connect_client(fd)elif event == EPOLLIN:self._handle_client(fd)if __name__ == '__main__':ep = EpollServer(host="0.0.0.0", port=8888)ep.serve_forever() # 启动服务