6.python网络编程

文章目录

    • 1.生产者消费者-生成器版
    • 2.生产者消费者--异步版本
    • 3.客户端/服务端-多线程版
    • 4.IO多路复用TCPServer模型
      • 4.1Select
      • 4.2Epoll
    • 5.异步IO多路复用TCPServer模型

1.生产者消费者-生成器版

import time# 消费者
def consumer():cnt = yieldwhile True:if cnt <= 0:# 暂停、让出CPUcnt = yield cntcnt -= 1time.sleep(1)print('consumer consum 1 cnt. cnt =', cnt)# 生产者 (调度器)
def producer(cnt):gen = consumer()# 激活生成器next(gen)gen.send(cnt)while True:cnt += 1print('producer producer 5 cnt. cnt =', cnt)# 调度消费者current = int(time.time())if current % 5 == 0:cnt = gen.send(cnt)else:time.sleep(1)if __name__ == '__main__':producer(0)

2.生产者消费者–异步版本

import asyncio
import time
from queue import Queue
from threading import Threaddef start_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()async def do_sleep(x, queue):await asyncio.sleep(x)queue.put('ok')def consumer(input_queue1, out_queue1):while True:task = input_queue1.get()if not task:time.sleep(1)continueasyncio.run_coroutine_threadsafe(do_sleep(int(task), out_queue1), new_loop)if __name__ == '__main__':print(time.ctime())new_loop = asyncio.new_event_loop()loop_thread = Thread(target=start_loop, args=(new_loop,))loop_thread.daemon = Trueloop_thread.start()input_queue = Queue()input_queue.put(5)input_queue.put(3)input_queue.put(1)out_queue = Queue()consumer_thread = Thread(target=consumer, args=(input_queue, out_queue,))consumer_thread.daemon = Trueconsumer_thread.start()while True:msg = out_queue.get()print("协程运行完...")print("当前时间:", time.ctime())

3.客户端/服务端-多线程版

客户端/服务模型

在这里插入图片描述

客户端

# -*- encoding=utf-8 -*-
# 客户端import socketclient = socket.socket()
print('client.fileno:', client.fileno())client.connect(('127.0.0.1', 8999))while True:content = str(input('>>>'))client.send(content.encode())content = client.recv(1024)print('client recv content:', content)

服务端

import socket
import threadingdef thread_process(s):while True:content = s.recv(1024)if len(content) == 0:breaks.send(content.upper())print(str(content, encoding='utf-8'))  # 接受来自客户端的消息,并打印出来s.close()server = socket.socket()  # 1. 新建socket
server.bind(('127.0.0.1', 8999))  # 2. 绑定IP和端口(其中127.0.0.1为本机回环IP)
server.listen(5)  # 3. 监听连接while True:s, addr = server.accept()  # 4. 接受连接new_thread = threading.Thread(target=thread_process, args=(s,))print('new thread process connect addr:{}'.format(addr))new_thread.start()

注意:

  • AddressFamily=AF_INET:(用于 Internet 进程间通信)

  • AddressFamily=AF_UNIX(用于同一台机器进程间通信)

  • 现象:报错[WinError 10038],原因分析:socket 先 close 再调 recv 就会报错,解决办法:if not tcpCliSock._closed:

4.IO多路复用TCPServer模型

4.1Select

服务端

import select
import socket
from queue import Queue, Empty
from time import sleepserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.setblocking(False)
server_address = ("127.0.0.1", 8999)
print('starting up on %s port %s' % server_address)
server.bind(server_address)
server.listen(5)
inputs = [server]
outputs = []
message_queues = {}while inputs:print('waiting for the next event')readable, writable, exceptional = select.select(inputs, outputs, inputs)for s in readable:if s is server:connection, client_address = s.accept()print(f"connection from {client_address}")connection.setblocking(False)inputs.append(connection)message_queues[connection] = Queue()continuedata = s.recv(1024).decode()if data == "":print(f'closing:{s.getpeername()}')if s in outputs:outputs.remove(s)inputs.remove(s)s.close()message_queues.pop(s)continueprint(f'received {data} from {s.getpeername()} ')message_queues[s].put(data)if s not in outputs:outputs.append(s)for s in writable:try:queue_item = message_queues.get(s)send_data = ''if queue_item:send_data = queue_item.get_nowait()except Empty:print(outputs.remove(s))print(f"{s.getpeername()} has closed")else:if queue_item:s.send(send_data.encode())for s in exceptional:print(f"Exception condition on {s.getpeername}")inputs.remove(s)if s in outputs:outputs.remove(s)s.close()message_queues.pop(s)sleep(1)

客户端

import socketmessages = ['This is the message ', 'It will be sent ', 'in parts ', ]
server_address = ("127.0.0.1", 8999)
socks = [socket.socket(socket.AF_INET, socket.SOCK_STREAM), socket.socket(socket.AF_INET, socket.SOCK_STREAM), ]
print('connecting to %s port %s' % server_address)
for s in socks:s.connect(server_address)for index, message in enumerate(messages):for s in socks:print('%s: sending "%s"' % (s.getsockname(), message + str(index)))s.send((message + str(index)).encode('utf-8'))for s in socks:data = s.recv(1024)print('%s: received "%s"' % (s.getsockname(), data))if data != "":print('closing socket', s.getsockname())s.close()
  • 为什么要将server放入到inputs中

在select模型中,将server放入到inputs中,当执行select时就会去检查server是否可读,就说明在缓冲区里有数据,对于server来说,有连接进入。使用accept获得客户端socket文件后,首先要放入到inputs当中,等待其发送消息。

  • readable

select会将所有可读的socket返回,包括server在内,假设一个客户端socket的缓冲区里有2000字节的内容,而这一次你只是读取了1024个字节,没有关系,下一次执行select模型时,由于缓冲区里还有数据,这个客户端socket还会被放入到readable列表中。因此,在读取数据时,不必再像之前那样使用一个while循环一直读取。

  • writable

在每一次写操作执行后,都从socket从writable中删除,这样做的原因很简单,该写的数据已经写完了,如果不删除,下一次select操作时,又会把他放入到writable中,可是现在已经没有数据需要写了啊,这样做没有意义,只会浪费select操作的时间,因为它要遍历outputs中的每一个socket,判断他们是否可写以决定是否将其放入到writtable中

  • 异常

在exceptional中,是发生错误和异常的socket,有了这个数组,就在也不用操心错误和异常了,不然程序写起来非常的复杂,有了统一的管理,发生错误后的清理工作将变得非常简单

4.2Epoll

服务端

# -*- encoding=utf-8 -*-
# IO多路复用TCPServer模型import select
import socketdef serve():server = socket.socket()server.bind(('127.0.0.1', 8999))server.listen(1)epoll = select.epoll()epoll.register(server.fileno(), select.EPOLLIN)connections = {}contents = {}while True:events = epoll.poll(10)for fileno, event in events:if fileno == server.fileno():# 当fd为当前服务器的描述符时,获取新连接s, addr = server.accept()  # 获取套接字和地址print(f"new connection from addr:{addr},fileno:{s.fileno()},socket:{s}")epoll.register(s.fileno(), select.EPOLLIN)connections[s.fileno()] = selif event == select.EPOLLIN:# 当fd不为服务器描述符为客户端描述符时,读事件就绪,有新数据可读s = connections[fileno]content = s.recv(1024)if content:# 当客户端发送数据时print(f"recv content is {content}")print(f"fileno:{fileno} event:{event}")epoll.modify(fileno, select.EPOLLOUT)contents[fileno] = contentelse:# 当客户端退出连接时print(f"recv content is null")print(f"fileno;{fileno} event:{event} ")epoll.unregister(fileno)s.close()connections.pop(fileno)elif event == select.EPOLLOUT:# 当fd不为服务器描述符为客户端描述符时,写事件就绪try:content = contents[fileno]s = connections[fileno]s.send(content)epoll.modify(s.fileno(), select.EPOLLIN)print(f"modify content is {content}")print(f"fileno;{fileno} event:{event} ")except Exception as error:epoll.unregister(fileno)s.close()connections.pop(fileno)contents.pop(fileno)print(f"modify content is failed")print(f"fileno;{fileno} event:{event} ")if __name__ == '__main__':serve()

客户端

# -*- encoding=utf-8 -*-
# 客户端import socketclient = socket.socket()
print('client.fileno:', client.fileno())client.connect(('127.0.0.1', 8999))while True:content = str(input('>>>'))client.send(content.encode())content = client.recv(1024)print('client recv content:', content.decode())

5.异步IO多路复用TCPServer模型

import socket
import select
from collections import dequeclass Future:"""可等待对象 Future"""def __init__(self, loop):self.loop = loopself.done = Falseself.co = Nonedef set_done(self):self.done = Truedef set_coroutine(self, co):self.co = codef __await__(self):if not self.done:yield selfreturnclass SocketWrapper:"""套接字协程适配器"""def __init__(self, sock: socket.socket, loop):self.loop = loopself.sock = sockself.sock.setblocking(False)self.fileno = self.sock.fileno()def create_future_for_events(self, events):future: Future = Future(loop=self.loop)def handler():future.set_done()self.loop.unregister_handler(self.fileno)if future.co:self.loop.add_coroutine(future.co)self.loop.register_handler(self.fileno, events, handler)return futureasync def accept(self):while True:try:sock, addr = self.sock.accept()return SocketWrapper(sock, self.loop), addrexcept BlockingIOError:future = self.create_future_for_events(select.EPOLLIN)await futureasync def recv(self, backlog):while True:try:return self.sock.recv(backlog)except BlockingIOError:future = self.create_future_for_events(select.EPOLLIN)await futureasync def send(self, data):while True:try:return self.sock.send(data)except BlockingIOError:future = self.create_future_for_events(select.EPOLLOUT)await futureclass EventLoop:"""调度器:epoll事件驱动"""current = Nonerunnable = deque()epoll = select.epoll()handler = {}@classmethoddef instance(cls):if not EventLoop.current:EventLoop.current = EventLoop()return EventLoop.currentdef register_handler(self, fileno, events, handler):self.handler[fileno] = handlerself.epoll.register(fileno, events)def unregister_handler(self, fileno):self.epoll.unregister(fileno)self.handler.pop(fileno)def add_coroutine(self, co):self.runnable.append(co)def run_coroutine(self, co):try:future: Future = co.send(None)future.set_coroutine(co)except Exception as e:print(e)print('coroutine {} stopped'.format(co.__name__))def run_forever(self):while True:while self.runnable:self.run_coroutine(co=self.runnable.popleft())events = self.epoll.poll(1)for fileno, event in events:handler = self.handler.get(fileno)handler()class TCPServer:def __init__(self, loop: EventLoop):self.loop = loopself.listen_sock: SocketWrapper = self.create_listen_socket()self.loop.add_coroutine(self.serve_forever())def create_listen_socket(self, ip='localhost', port=8999):sock = socket.socket()sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)sock.bind((ip, port))sock.listen()return SocketWrapper(sock, self.loop)async def handler_client(self, sock: SocketWrapper):while True:data = await sock.recv(1024)if not data:print('client disconnected')breakawait sock.send(data.upper())async def serve_forever(self):while True:sock, addr = await self.listen_sock.accept()print(f'client connect addr = {addr}')self.loop.add_coroutine(self.handler_client(sock))if __name__ == '__main__':loop = EventLoop.instance()server = TCPServer(loop)loop.run_forever()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/5874.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

智能家居|基于SprinBoot+vue的智能家居系统(源码+数据库+文档)

智能家居目录 基于SprinBootvue的智能家居系统 一、前言 二、系统设计 三、系统功能设计 1管理员&#xff1a;个人中心管理功能的详细实现 2管理员&#xff1a;用户信息管理功能的详细实现 3管理员&#xff1a;家具管理功能的详细实现 4管理员&#xff1a;任务管理功能…

Python绘制的好看统计图

代码 sx [Accent, Accent_r, Blues, Blues_r, BrBG, BrBG_r, BuGn, BuGn_r, BuPu, BuPu_r, CMRmap, CMRmap_r, Dark2, Dark2_r, GnBu, GnBu_r, Greens, Greens_r, Greys, Greys_r, OrRd, OrRd_r, Oranges, Oranges_r, PRGn, PRGn_r, Paired, Paired_r, Pastel1, Pastel1_r, P…

CSAPP | Floating Point

CSAPP | Floating Point b i b_i bi​ b i − 1 b_{i-1} bi−1​ … b 2 b_2 b2​ b 1 b_1 b1​ b 0 b_0 b0​ b − 1 b_{-1} b−1​ b − 2 b_{-2} b−2​ b − 3 b_{-3} b−3​ … b − j b_{-j} b−j​ S ∑ k − j i b k 2 k S\sum_{k-j}^{i}b_k\times2^k S∑k…

了解一下创新奇智的 Orion 分布式机器学习平台

&#x1f349; CSDN 叶庭云&#xff1a;https://yetingyun.blog.csdn.net/ 创新奇智的 Orion 分布式机器学习平台是一个企业级的端到端机器学习解决方案&#xff0c;它通过整合智能资源调度中心&#xff08;IRC&#xff09;、智能数据自动化中心&#xff08;DAC&#xff09;和自…

HarmonyOS 4.0(鸿蒙开发)01 - 怎么学习鸿蒙引导篇

作为公司的全栈开发工程师 以及 未来的发展是有鸿蒙这个阶段的&#xff0c;以及本身具有这个技术栈由此后续会分享自己在实战中学习到的东西&#xff0c;碰到的bug都会分享出来&#xff0c;这是引导篇期待后续的更新 学习目标&#xff1a; 理解HarmonyOS操作系统的架构和开发…

三维坐标点按剖面分类

一、写在前面 ①配套文件&#xff1a;根据剖面对三维坐标点&#xff08;X,Y,Z&#xff09;分类资源-CSDN文库 ②脱敏处理&#xff1a;蚀变数据已采用随机数生成覆盖 ③剖面坐标按顺序排列在“剖面坐标点.xlsx”文件中 二、3点确定空间中平面方程 原理&#xff1a; 设3点A&…

YOLOv8主要命令讲解

YOLOv8主要有三个常用命令&#xff0c;分别是&#xff1a;train&#xff08;训练&#xff09;、predict&#xff08;预测&#xff09;、export&#xff08;转化模型格式&#xff09;&#xff0c;下面我将展开讲讲三个常用命令的常用参数与具体使用方法。 一、训练 通过自己标…

Docker容器---Harbor私有仓库部署与管理

一、搭建本地私有仓库 1、本地私有仓库简介 有时候使用Docker Hub这样的公共仓库可能不方便&#xff0c;这种情况下用户可以使用registry创建一个本地仓库供私人使用&#xff0c;这点跟Maven的管理类似。 2、使用私有仓库的优点 节省网络带宽&#xff0c;针对于每个镜像不用…

知乎广告开户流程,知乎广告的优势是什么?

社交媒体平台不仅是用户获取知识、分享见解的场所&#xff0c;更是品牌展示、产品推广的重要舞台。知乎作为国内知名的知识分享社区&#xff0c;以其高质量的内容生态和庞大的用户基础&#xff0c;成为了众多企业进行广告投放的优选之地。云衔科技通过其专业服务&#xff0c;助…

【ZIP技巧】zip压缩包太大,怎么缩小?

如果文件压缩成zip压缩包&#xff0c;但是压缩之后&#xff0c;体积仍然很大&#xff0c;该如何解决呢&#xff1f;今天分享ZIP压缩包太大的几个缩小方法。 方法一&#xff1a; 适当减少文件内的不必要文件。 方法二&#xff1a; 使用7-zip格式压缩包&#xff0c;会比zip格…

Python异步Redis客户端与通用缓存装饰器

前言 这里我将通过 redis-py 简易封装一个异步的Redis客户端&#xff0c;然后主要讲解设计一个支持各种缓存代理&#xff08;本地内存、Redis等&#xff09;的缓存装饰器&#xff0c;用于在减少一些不必要的计算、存储层的查询、网络IO等。 具体代码都封装在 HuiDBK/py-tools: …

【PHP】安装指定版本Composer

1、下载指定版本composer.phar文件&#xff1a;https://github.com/composer/composer/releases 2、将下载的文件添加到全局路径&#xff1a; sudo mv composer.phar /usr/local/bin/composer 3、赋予权限&#xff1a; sudo chmod x /usr/local/bin/composer 4、查看compos…

Linux进程——进程的创建(fork的原理)

前言&#xff1a;在上一篇文章中&#xff0c;我们已经会使用getpid/getppid函数来查看pid和ppid,本篇文章会介绍第二种查看进程的方法&#xff0c;以及如何创建子进程&#xff01; 本篇主要内容&#xff1a; 查看进程的第二种方法创建子进程系统调用函数fork 在开始前&#xff…

一文了解双向链表

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、链表分类二、双向链表是什么&#xff1f;三、功能函数实现1.申请一个节点2.初始化3.尾插4.头插5.尾删6.头删7.在指定位置后插入8.删除指定位置数据9.查找10…

带环链表问题

带环链表就是字面意思带环的链表&#xff0c;例如以下这三种情况 练习题 1.给定一个链表&#xff0c;判断链表中是否带环. - 力扣&#xff08;LeetCode&#xff09; 思路&#xff1a;快慢指针&#xff0c;慢指针走一步&#xff0c;快指针走两步&#xff0c;两个指针从链表的起…

nginx的前世今生(二)

书接上回&#xff1a; 上回书说到&#xff0c;nginx的前世今生&#xff0c;这回我们继续说 3.缓冲秘籍&#xff0c;洪流控水 Nginx的缓冲区是其处理数据传输和提高性能的关键设计之一&#xff0c;主要用于暂存和管理进出的数据流&#xff0c;以应对不同组件间速度不匹配的问题…

池化整合多元数据库,zData X 一体机助力证券公司IT基础架构革新

引言 近期&#xff0c;云和恩墨 zData X 多元数据库一体机&#xff08;以下简称 zData X&#xff09;在某证券公司的OA、短信和CRM业务系统中成功上线&#xff0c;标志着其IT基础架构完成从集中式存储向池化高性能分布式存储的转变。zData X 成功整合了该证券公司使用的达梦、O…

Windows php 安装 Memcached扩展、php缺失 Memcached扩展、Class ‘Memcached‘ not found

在Windows系统下如何安装 php Memcached 扩展 下载dll文件 pecl地址&#xff1a;https://pecl.php.net/package/memcached 根据版本进行选择 &#xff1a; 解压下载的文件后得到了这么样的文件结构&#xff1a; 配置 移动dll文件到相应文件位置 重点&#xff1a; libme…

FreeRTOS队列集(1-15)

队列集定义&#xff1a;def 队列集只允许任务间传递消息为同一种数据类型&#xff0c;如果需要在任务间传递不同数据类型的消息时&#xff0c;就可以使用队列集。 用于对多个信号量进行监听&#xff0c;其中不管哪一个消息到来&#xff0c;都可以让任务退出阻塞状态 假设&am…

如何利用MCU自动测量单元提高大坝安全监测效率

大坝作为重要的水利基础设施&#xff0c;其安全性直接关系到人民群众的生命财产安全和社会的稳定发展。因此&#xff0c;对大坝进行实时、准确的安全监测至关重要。近年来&#xff0c;随着微控制器单元(MCU)技术的不断发展&#xff0c;其在大坝安全监测领域的应用也越来越广泛。…