文章目录
- socket套接字
- 客户端/服务模型
- linux文件描述符fd
- Linux网络IO模型详解
- 网络服务器Apache VS Nginx
- 生产者消费者-生成器版
- 客户端/服务端-多线程版
- IO多路复用TCPServer模型
- 异步IO多路复用TCPServer模型
socket套接字
-
套接字(socket)是抽象概念,表示TCP连接的一端
-
通过套接字可以进行数据发送或接收 {IP:Port}==>> 套接字
-
TCP连接由两个套接字组成
={Socket1:Socket2}
={IP:Port}{IP:Port}
客户端/服务模型
linux文件描述符fd
Linux一切皆是文件,文件类型 socket.socket.fileno()
-
普通文件
-
目录文件
-
符号链接
-
设备文件
-
套接字
- 进程级的文件描述符表 :文件
- 系统级的文件描述符表 :外设
- 文件系统的i-Node表:目录
-
FIFO
Linux网络IO模型详解
阻塞式IO
非阻塞式IO
IO多路复用
信号驱动式IO
异步IO
五种IO比较
IO多路复用
-
select:线性扫描所有监听的文件描述符fd
-
poll:同选择性能有所优化
-
Epoll:使用红黑树管理数据结构,性能好
网络服务器Apache VS Nginx
Apache | Nginx |
---|---|
多线程 | 多路复用 |
资源占用大 | 更加轻量 |
模块多 | 模块化设计,社区活跃 |
稳定,缺陷少 | 静态资源、反向代理 |
性能较好 | 性能很好 |
生产者消费者-生成器版
import time# 消费者
def consumer():cnt = yieldwhile True:if cnt <= 0:# 暂停、让出CPUcnt = yield cntcnt -= 1time.sleep(1)print('consumer consum 1 cnt. cnt =', cnt)# 生产者 (调度器)
def producer(cnt):gen = consumer()# 激活生成器next(gen)gen.send(cnt)while True:cnt += 1print('producer producer 5 cnt. cnt =', cnt)# 调度消费者current = int(time.time())if current % 5 == 0:cnt = gen.send(cnt)else:time.sleep(1)if __name__ == '__main__':producer(0)
客户端/服务端-多线程版
客户端
# -*- encoding=utf-8 -*-
# 客户端import socketclient = socket.socket()
print('client.fileno:', client.fileno())client.connect(('127.0.0.1', 8999))while True:content = str(input('>>>'))client.send(content.encode())content = client.recv(1024)print('client recv content:', content)
服务端
import socket
import threadingdef thread_process(s):while True:content = s.recv(1024)if len(content) == 0:breaks.send(content.upper())print(str(content, encoding='utf-8')) # 接受来自客户端的消息,并打印出来s.close()server = socket.socket() # 1. 新建socket
server.bind(('127.0.0.1', 8999)) # 2. 绑定IP和端口(其中127.0.0.1为本机回环IP)
server.listen(5) # 3. 监听连接while True:s, addr = server.accept() # 4. 接受连接new_thread = threading.Thread(target=thread_process, args=(s,))print('new thread process connect addr:{}'.format(addr))new_thread.start()
IO多路复用TCPServer模型
# -*- encoding=utf-8 -*-
# IO多路复用TCPServer模型import select
import socketdef serve():server = socket.socket()server.bind(('127.0.0.1', 8999))server.listen(1)epoll = select.epoll()epoll.register(server.fileno(), select.EPOLLIN)connections = {}contents = {}while True:events = epoll.poll(10)for fileno, event in events:if fileno == server.fileno():# 当fd为当前服务器的描述符时,获取新连接s, addr = server.accept() # 获取套接字和地址print(f"new connection from addr:{addr},fileno:{s.fileno()},socket:{s}")epoll.register(s.fileno(), select.EPOLLIN)connections[s.fileno()] = selif event == select.EPOLLIN:# 当fd不为服务器描述符为客户端描述符时,读事件就绪,有新数据可读s = connections[fileno]content = s.recv(1024)if content:# 当客户端发送数据时print(f"recv content is {content}")print(f"fileno:{fileno} event:{event}")epoll.modify(fileno, select.EPOLLOUT)contents[fileno] = contentelse:# 当客户端退出连接时print(f"recv content is null")print(f"fileno;{fileno} event:{event} ")epoll.unregister(fileno)s.close()connections.pop(fileno)elif event == select.EPOLLOUT:# 当fd不为服务器描述符为客户端描述符时,写事件就绪try:content = contents[fileno]s = connections[fileno]s.send(content)epoll.modify(s.fileno(), select.EPOLLIN)print(f"modify content is {content}")print(f"fileno;{fileno} event:{event} ")except Exception as error:epoll.unregister(fileno)s.close()connections.pop(fileno)contents.pop(fileno)print(f"modify content is failed")print(f"fileno;{fileno} event:{event} ")if __name__ == '__main__':serve()
异步IO多路复用TCPServer模型
import socket
import select
from collections import dequeclass Future:"""可等待对象 Future"""def __init__(self, loop):self.loop = loopself.done = Falseself.co = Nonedef set_done(self):self.done = Truedef set_coroutine(self, co):self.co = codef __await__(self):if not self.done:yield selfreturnclass EventLoop:"""调度器:epoll事件驱动"""current = Nonerunnable = deque()epoll = select.epoll()handler = {}@classmethoddef instance(cls):if not EventLoop.current:EventLoop.current = EventLoop()return EventLoop.currentdef create_future(self):return Future(loop=self)def register_handler(self, fileno, events, handler):self.handler[fileno] = handlerself.epoll.register(fileno, events)def unregister_handler(self, fileno):self.epoll.unregister(fileno)self.handler.pop(fileno)def add_coroutine(self, co):self.runnable.append(co)def run_coroutine(self, co):try:future: Future = co.send(None)future.set_coroutine(co)except Exception as e:print(e)print('coroutine {} stopped'.format(co.__name__))def run_forever(self):while True:while self.runnable:self.run_coroutine(co=self.runnable.popleft())events = self.epoll.poll(1)for fileno, event in events:handler = self.handler.get(fileno)handler()class SocketWrapper:"""套接字协程适配器"""def __init__(self, sock: socket.socket, loop: EventLoop):self.loop = loopself.sock = sockself.sock.setblocking(False)@propertydef fileno(self):return self.sock.fileno()def create_future_for_events(self, events):future: Future = self.loop.create_future()def handler():future.set_done()self.loop.unregister_handler(self.fileno)if future.co:self.loop.add_coroutine(future.co)self.loop.register_handler(self.fileno, events, handler)return futureasync def accept(self):while True:try:sock, addr = self.sock.accept()return SocketWrapper(sock, self.loop), addrexcept BlockingIOError:future = self.create_future_for_events(select.EPOLLIN)await futureasync def recv(self, backlog):while True:try:return self.sock.recv(backlog)except BlockingIOError:future = self.create_future_for_events(select.EPOLLIN)await futureasync def send(self, data):while True:try:return self.sock.send(data)except BlockingIOError:future = self.create_future_for_events(select.EPOLLOUT)await futureclass TCPServer:def __init__(self, loop: EventLoop):self.loop = loopself.listen_sock: SocketWrapper = self.create_listen_socket()self.loop.add_coroutine(self.serve_forever())def create_listen_socket(self, ip='localhost', port=8999):sock = socket.socket()sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)sock.bind((ip, port))sock.listen()return SocketWrapper(sock, self.loop)async def handler_client(self, sock:SocketWrapper):while True:data = await sock.recv(1024)if not data:print('client disconnected')breakawait sock.send(data.upper())async def serve_forever(self):while True:sock, addr = await self.listen_sock.accept()print(f'client connect addr = {addr}')self.loop.add_coroutine(self.handler_client(sock))if __name__ == '__main__':loop = EventLoop.instance()server = TCPServer(loop)loop.run_forever()