用Python做一个websocket服务端

我对websocket服务端的功能定义是:

1.能将client端的软硬件信息关联(如client_name和对应ip:port),且不支持重复关联;

2.可以判断接收自client端的消息属于哪种任务,并对应执行(如关联、发消息);

3.根据接收自client端的消息判断是点对点发送,还是广播发送,并执行。

参考其他大神们写的代码,我用python实现了上述功能需求:

import asyncio
import websockets
import json
import threading# 存储所有的客户端
Clients = []# 服务端
class WS_Server():def __init__(self):self.ip = "127.0.0.1"self.port = 9090# 发送消息async def sendMsg(self, websocket, msg):if websocket != None:await websocket.send(msg)else:self.broadcast(msg)await asyncio.sleep(0.2)# 群发消息async def broadcast(self, msg):for user in Clients:await user['socket'].send(msg)# 连接一个客户端,起一个循环监听async def echo(self, websocket):# 握手client_ip, client_port = websocket.remote_addressprint(f"连接到:{client_ip}:{client_port}")await websocket.send(json.dumps({"stat": "success"}))# 循环监听while True:try:recv_text = await websocket.recv()message = "收到消息: {}".format(recv_text)await websocket.send(message)data = json.loads(recv_text)receiver = data['to']content = data['content']sender = data['from']stat = data['stat']if stat == "link":checked = Falseif len(Clients) > 0 :for usr in Clients:if usr['name'] == sender:checked = Truebreakif checked:print(f"{client_ip}:{client_port}不能被重复关联")else:Clients.append({"socket": websocket, "name": sender})print(f"{sender} 已关联 {client_ip}:{client_port}")else:Clients.append({"socket": websocket, "name": sender})print(f"{sender} 已关联 {client_ip}:{client_port}")else:msg = json.dumps({"stat": stat, "to": receiver, "content": content, "from": sender})if receiver == "":await self.broadcast(msg)print('消息已群发')else:checked1 = Falsefor usr in Clients:if usr['name'] == receiver:socket = usr['socket']await self.sendMsg(socket, msg)checked1 = Truebreakif checked1:print(f"已给{receiver}发送消息")else:print(f"发送失败,未找到{receiver}")except websockets.ConnectionClosed:print(f"从{client_ip}:{client_port}断开连接")breakexcept websockets.InvalidState:print("无效状态")breakexcept Exception as e:print("ws连接报错", e)break# 启动服务器async def runServer(self):async with websockets.serve(self.echo, self.ip, self.port):await asyncio.Future()  # run forever# 多协程模式,防止阻塞主线程无法做其他事情def WebSocketServer(self):asyncio.run(self.runServer())# 多线程启动def startServer(self):# 多线程启动,否则会堵塞thread = threading.Thread(target=self.WebSocketServer)thread.start()# thread.join()if __name__=='__main__':s = WS_Server()s.startServer()print("ws服务已启动")

启动WS服务后,客户端需要按照如下数据格式发消息:

{"stat":"","to": "", "content": "", "from":""}

其中,stat包括link和send两种状态,to是消息接者者(receiver),from是消息发送者(sender)。若to为空,则表示该消息是广播消息;若to不为空,但对象不存在,则该消息不会被执行发送。

最后,附上我参考的两篇CSDN文章:

Python中websockets服务端从客户端接收消息并发送给多线程_python websocket 多线程-CSDN博客

python的websocket方法教程_python websocket-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/61724.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Lua如何连接MySQL数据库?

大家好,我是袁庭新。使用Lua语言如何来连接数据库呢?新哥这篇文章给你安排上。 1 LuaSQL概述 LuaSQL是一个轻量级的Lua到数据库管理系统(DBMS)的接口库,由Kepler Project维护,且是开源的。它提供了一个简…

机器学习基础07

目录 1.逻辑回归 1.1原理 1.2API 2.K-Means 2.1算法过程 2.2API 3.SVM(支持向量机) 3.1算法原理​ 3.2API 1.逻辑回归 逻辑回归(Logistic Regression)是机器学习中的一种分类模型,逻辑回归是一种分类算法。 1.1原理 逻辑回归的输…

VMware Workstation 17.6.1

概述 目前 VMware Workstation Pro 发布了最新版 v17.6.1: 本月11号官宣:针对所有人免费提供,包括商业、教育和个人用户。 使用说明 软件安装 获取安装包后,双击默认安装即可: 一路单击下一步按钮: 等待…

解决 VMware 嵌套虚拟化提示 关闭“侧通道缓解“

最近给电脑做了新版的 Windows 11 LTSC操作系统,在启动VMware Workstation时,提示"此虚拟机已启用侧通道缓解,可增强安全性,但也会降低性能",但是我没有启用 Hyper-V 相关的任何功能以及 WSL, 从…

Java学习笔记--数组常见算法:数组翻转,冒泡排序,二分查找

目录 一,数组翻转 二,冒泡排序 三,二分查找(一尺之锤,日取其半,万世不竭) 一,数组翻转 1.概述:数组对称索引位置上的元素互换,最大值数组序号是数组长度减一 创建跳板…

数据结构-7.Java. 对象的比较

本篇博客给大家带来的是java对象的比较的知识点, 其中包括 用户自定义类型比较, PriorityQueue的比较方式, 三种比较方法...... 文章专栏: Java-数据结构 若有问题 评论区见 欢迎大家点赞 评论 收藏 分享 如果你不知道分享给谁,那就分享给薯条. 你们的支持是我不断创作的动力 .…

悬浮框元素定位

Web页面中调试悬浮元素的方法 在UI自动化测试或Web开发中,悬浮框的特点是鼠标进入时弹出,鼠标离开时消失。这种动态特性导致普通方法难以直接定位悬浮框内的元素。所以需要冻结页面或使用模拟Hover状态来进行调试。 方法一:冻结页面进行调试…

IDEA算法的详细介绍及Python实现

目录 IDEA算法的详细介绍及Python实现引言第一部分:IDEA算法的原理与背景1.1 IDEA算法的来源与特点1.2 IDEA算法的核心步骤第二部分:IDEA算法的Python实现(面向对象设计)2.1 核心类设计2.2 代码实现2.3 使用示例第三部分:案例1 - 函数优化问题(策略模式)3.1 问题描述3.2…

「Mac玩转仓颉内测版24」基础篇4 - 浮点类型详解

本篇将详细介绍 Cangjie 中的浮点类型,包括浮点数的表示方法、精度、舍入与溢出处理、科学计数法表示、字面量的进制表示、常用运算、类型转换及应用场景,帮助开发者掌握浮点数的使用方法。 关键词 浮点类型表示精度与舍入溢出与下溢科学计数法类型转换…

MAC借助终端上传jar包到云服务器

前提:保证工程本地已打包完成:图中路径即为项目的target目录下已准备好的jar包 第一步:打开终端(先不要连接自己的服务器),输入下面的上传命令: scp /path/to/local/app.jar username192.168.1…

el-select 和el-tree二次封装

前言 本文章是本人在开发过程中&#xff0c;遇到使用树形数据&#xff0c;动态单选或多选的需求&#xff0c;element中没有这种组件&#xff0c;故自己封装一个&#xff0c;欢迎多多指教 开发环境&#xff1a;element-UI、vue2 组件效果 单选 多选 组件引用 <treeselec…

基于Python Web的社区爱心养老管理系统设计与实现

摘 要 随着社会老龄化的加剧&#xff0c;养老问题日益凸显。为了解决社区养老服务的管理难题&#xff0c;本文提出了一种基于互联网技术的社区爱心养老管理系统。该系统采用B/S架构&#xff0c;结合Web前端技术和后端数据库技术&#xff0c;实现了对社区养老服务的全面管理。系…

在 Ubuntu 上使用 Traefik Proxy 为 Docker 容器设置反向代理

简介 Traefik&#xff08;发音为"traffic"&#xff09;是一个开源的反向代理和负载均衡器。它为微服务架构提供了网络入口&#xff0c;特别是在动态、服务密集的环境中&#xff08;如容器、微服务架构&#xff09;。由于其设计灵活且易于实施&#xff0c;Traefik 成…

有关博客博客系统的测试报告 --- 初次进行项目测试篇

文章目录 前言一、博客系统的项目背景二、博客系统的项目简介1.后端功能1.1 用户管理1.2 博客管理1.3 权限管理 2.前端功能2.1 用户界面 测试计划测试工具、环境设计的测试动作功能测试访问博客登录页面博客首页测试博客详情页博客编辑页 自动化测试自动化测试用例自动化测试脚…

Redis 性能优化 18招

前言 Redis在我们的日常开发工作中&#xff0c;使用频率非常高&#xff0c;已经变成了必不可少的技术之一。 Redis的使用场景也很多。 比如&#xff1a;保存用户登录态&#xff0c;做限流&#xff0c;做分布式锁&#xff0c;做缓存提升数据访问速度等等。 那么问题来了&…

Elasticsearch 开放推理 API 增加了对 IBM watsonx.ai Slate 嵌入模型的支持

作者&#xff1a;来自 Elastic Saikat Sarkar 使用 Elasticsearch 向量数据库构建搜索 AI 体验时如何使用 IBM watsonx™ Slate 文本嵌入。 Elastic 很高兴地宣布&#xff0c;通过集成 IBM watsonx™ Slate 嵌入模型&#xff0c;我们的开放推理 API 功能得以扩展&#xff0c;这…

ts- declare关键词及vue3报错“Window typeof globalThis”上不存在属性“nextLoading”、`

报错“Window & typeof globalThis”上不存在属性“nextLoading”、 代码环境&#xff1a;vue3、ts 阮一峰讲解 declarets 用法告诉编译器某个类型是存在的 下面的例子是脚本使用浏览器全局对象document。 declare var document; document.title "Hello";上面…

flume-将日志采集到hdfs

看到hdfs大家应该做什么&#xff1f; 是的你应该去把集群打开&#xff0c; cd /export/servers/hadoop/sbin 启动集群 ./start-all.sh 在虚拟机hadoop02和hadoop03上的conf目录下配置相同的日志采集方案&#xff0c;‘ cd /export/servers/flume/conf 切换完成之后&#…

已解决wordpress提示正在执行例行维护,请一分钟后回来

今天打开网站时提示“正在执行例行维护,请一分钟后回来”&#xff0c;一分钟后还这样&#xff0c;刷新也没用&#xff0c;这究竟是怎么回事了&#xff1f; 问题原因 这是WordPress在更新&#xff0c;wordpress在升级程序、主题、插件时&#xff0c;都会先切换到维护模式&…

TensorFlow如何调用GPU?

要在代码中使用 TensorFlow 的 GPU 功能&#xff0c;需要确保安装了支持 GPU 的 TensorFlow 版本&#xff0c;并正确配置了 CUDA 和 cuDNN。以下是如何调用和配置 TensorFlow 以使用 GPU 的步骤&#xff1a; 1. 安装 GPU 版本的 TensorFlow 首先&#xff0c;确保安装了 Tenso…