Python中websockets服务端从客户端接收消息并发送给多线程

思路:

1.websockets需要从客户端接收消息,由于websockets创建服务端只能绑定一个端口,所以需要单独占用一个线程。收到的消息,我们需要共享给主线程,然后主线程根据设备(多线程)分发消息给各线程

2.消息中心需要独立出来,websockets服务端放消息,主线程去消息

3.根据思路设计模块:

                        1.消息库

                        2.服务端

                        3.主线程

                        4.多线程

先运行Main.py,再运行websocket_client.py(客户端),客户端发送的消息可能不一样,所以统一消息里面必须有device_id 或者device_name

修改websocket_client.py中data的信息,发送不同消息

一、消息处理中心 message_base.py

根据设备,创建储存设备消息,提取设备消息的功能

from queue import Queueclass MessageBase:def __init__(self):self.data = dict()def add(self, device, data):if device in self.data:self.data[device].put(data)else:self.data[device] = Queue()self.data[device].put(data)def get(self, device):data_queue: Queue = self.data.get(device)if not data_queue or data_queue.empty():return Nonedata = data_queue.get()return dataif __name__ == '__main__':mb = MessageBase()mb.add("a", "asdasd")mb.add("a", "11111111")print(mb.data)data = mb.get("a")print(data)

二、服务端:websocket_server.py

从客户端接收消息,并存到消息中心

import asyncio
import json
import threading
import websockets
##
from message_base import MessageBaseclass WebServer:def __init__(self, host, port, message_base: MessageBase):self.host = hostself.port = portself.clients = []self.message_base = message_baseasync def echo(self, websocket, path):self.clients.append(websocket)client_ip, client_port = websocket.remote_addresswhile True:try:recv_text = await websocket.recv()data = json.loads(recv_text)device = data.get("device")if device:self.message_base.add(device, data)else:continueexcept websockets.ConnectionClosed:print("ConnectionClosed...")  # 链接断开self.clients.remove(websocket)breakexcept websockets.InvalidState:print("InvalidState...")  # 无效状态self.clients.remove(websocket)breakexcept Exception as e:print(e)def connect(self):print("连接成功!")asyncio.set_event_loop(asyncio.new_event_loop())start_server = websockets.serve(self.echo, self.host, self.port)asyncio.get_event_loop().run_until_complete(start_server)asyncio.get_event_loop().run_forever()def run(self):t = threading.Thread(target=self.connect)t.start()print("已启动!")if __name__ == '__main__':mb = MessageBase()ws = WebServer("192.168.6.28", 8001, mb)ws.run()

三、设备功能 device_function.py

每个设备对应的线程功能,可以统一也可以写多个功能

class DeviceFunc:def __init__(self, device_name, data):self.device_name = device_nameself.data = datadef show_data(self):if self.data:print(self.device_name, "收到消息:", self.data.get("value"))

四、主线程 main.py

初始化所有功能模块,并运行主线程

from message_base import MessageBase
from websocket_server import WebServer
from device_function import DeviceFuncclass MainThread:def __init__(self, message_base: MessageBase, websocket_server: WebServer, device_list):self.message_base = message_baseself.websocket_server = websocket_serverself.device_list = device_listdef run_server(self):self.websocket_server.run()def run(self):self.run_server()while True:for device in self.device_list:try:# 开始根据设备即功能处理消息data = self.message_base.get(device)if not data:continuedf = DeviceFunc(device, data)df.show_data()except Exception as err:passif __name__ == '__main__':mb = MessageBase()ws = WebServer("192.168.6.28", 8000, mb)device_list = ["aa", "bb", "cc"]mt = MainThread(mb, ws, device_list)mt.run()

五、客户端

给服务端发送消息,测试用

import jsonimport websocketclass WebClient:def __init__(self, host, port):self.host = hostself.port = portself.conn = Noneself.flag = Falsedef connect(self):try:url = f"ws://{self.host}:{self.port}"self.conn = websocket.create_connection(url)self.flag = Trueprint("连接成功")except Exception as err:self.flag = Falseprint("连接失败", err)def close(self):self.conn.close()def recv(self):data = self.conn.recv(1024)print(data)def send(self, data):self.conn.send(data)print("发送成功")if __name__ == '__main__':host = "192.168.6.28"port = 8000ws = WebClient(host, port)if not ws.flag:ws.connect()data = {"device": "bb", "value": "123"}data = {"device": "cc", "value": "123"}data = json.dumps(data)ws.send(data)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/700836.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

嵌入式学习-qt-Day3

嵌入式学习-qt-Day3 一、思维导图 二、作业 完善对话框,点击登录对话框,如果账号和密码匹配,则弹出信息对话框,给出提示”登录成功“,提供一个Ok按钮,用户点击Ok后,关闭登录界面,跳…

「Java开发指南」MyEclipse如何支持Spring Scaffolding?(三)

在上文中(点击这里回顾>>),主要为大家介绍了CRUD Scaffolding,本文将继续介绍应用程序的分层、代码助手等。 MyEclipse v2023.1.2离线版下载 3. 应用程序的分层 应用程序分层是应用程序开发领域中非常常见的体系结构方法…

C++之类作用域

目录 1、全局作用域 2、类作用域 2.1、设计模式之Pimpl 2.2、单例模式的自动释放 2.2.0、检测内存泄漏的工具valgrind 2.2.1、可以使用友元形式进行设计 2.2.2、内部类加静态数据成员形式 2.2.3、atexit方式进行 2.2.4、pthread_once形式 作用域可以分为类作用域、类名…

c++学习记录 STL基本概念

1、STL基本概念 STL(Standard Template Library,标准模板库)STL广义上分为:容器(container)算法(algorithm)迭代器(iterator)容器和算法之间通过迭代器进行无…

线程共享和非共享的资源及线程优缺点

注意:共享的内存地址空间中不包括栈;共享文件描述符表,表示,同一进程中线程可以操作同一文件。

猫头虎分享已解决Bug || TypeError: Cannot read property ‘props‘ of undefined (React)

博主猫头虎的技术世界 🌟 欢迎来到猫头虎的博客 — 探索技术的无限可能! 专栏链接: 🔗 精选专栏: 《面试题大全》 — 面试准备的宝典!《IDEA开发秘籍》 — 提升你的IDEA技能!《100天精通鸿蒙》 …

c++的类型转换方法

一、静态类型转换(static_cast) 静态类型的转换主要用于基本类型之间的转换,比如int类型转换为double类型。但是static_cast也可以支持上下行的转换(存在继承关系之间的转换) 基本类型之间的转换举例 上下行转换的举…

金航标电子位于广西柳州鹿寨县天线生产基地于大年正月初九开工了!!!

金航标kinghelm(www.kinghelm.com.cn)总部位于中国深圳市,兼顾技术、成本、管理、效率和可持续发展。东莞塘厦实验室全电波暗室、网络分析仪、高低温测试柜等仪器设备齐全,可进行高低温、双85等测试,独立完成产品的检测…

Linux内核中并发与竞争的处理方法:原子操作代码举例二

一. 简介 前面文章学习了Linux内核中处理并发与竞争的一种方法:原子操作,并编写代码说明原子操作中对整型变量的操作,文章地址如下: Linux内核中并发与竞争的处理方法:原子操作代码举例一-CSDN博客 本文学习原子操…

流畅的Python(十二)-继承的优缺点

一、核心要义 1. 子类化内置类型的缺点 2.多重继承和方法解析顺序 二、代码示例 1. 子类化内置类型的缺点 #!/usr/bin/env python # -*- coding: utf-8 -*- # Time : 2024/2/24 7:29 # Author : Maple # File : 01-子类化内置类型的问题.py # Software: PyCharm fr…

数字化转型导师坚鹏:数据安全法解读与政府数字化转型

网络安全法、数据安全法、个人信息保护法解读与政府数字化转型 课程背景: 很多机构存在以下问题: 不清楚网络安全法、数据安全法、个人信息保护法立法背景? 不知道如何理解网络安全法、数据安全法、个人信息保护法政策? 不…

高级RAG:使用RAGAs + LlamaIndex进行RAG评估,包括原理、图和代码

原文地址:Using RAGAs LlamaIndex for RAG evaluation 2024 年 2 月 5 日 如果您已经为实际的业务系统开发了检索增强生成(Retrieval Augmented Generation, RAG)应用程序,那么您可能会关心它的有效性。换句话说,您…

Golin 弱口令/漏洞/扫描/等保/基线核查的快速安全检查小工具

下载地址: 链接:https://pan.quark.cn/s/db6afba6de1f 主要功能 主机存活探测、漏洞扫描、子域名扫描、端口扫描、各类服务数据库爆破、poc扫描、xss扫描、webtitle探测、web指纹识别、web敏感信息泄露、web目录浏览、web文件下载、等保安全风险问题风险…

小雉系统uefi安全启动支持(微软认证)

概述 从windows8开始,微软以安全为由使用uefi替换了传统的bios引导系统,并从2021年启新生产的硬件不再支持传统bios,导致基于传统bios制作的小雉系统无法启动运行: uefi的安全启动原理是主板使用内置的微软公钥校验grub2等引导程序(未经过微软签名的grub2无法…

【前端素材】推荐优质后台管理系统Start Admin平台模板(附源码)

一、需求分析 后台管理系统是一种用于管理网站、应用程序或系统的工具,它通常作为一个独立的后台界面存在,供管理员或特定用户使用。下面详细分析后台管理系统的定义和功能: 1. 定义 后台管理系统是一个用于管理和控制网站、应用程序或系统…

nios ii开发随笔

错误一: d:/intelfpga/17.1/nios2eds/bin/gnu/h-x86_64-mingw32/bin/../lib/gcc/nios2-elf/5.3.0/../../../../../H-x86_64-mingw32/nios2-elf/bin/ld.exe: test.elf section .text will not fit in region ram_oc_xzs d:/intelfpga/17.1/nios2eds/bin/gnu/h-x86_6…

关于Spring中管理Bean的杂谈

关于Spring中管理Bean的杂谈 1 filter、监听器,不能直接被spring容器管理2 Listener监听器不能被Spring管理3 使用EasyExcel的自定义监听器,可使用Spring容器 1 filter、监听器,不能直接被spring容器管理 1 spring更多的是管理我们自己的各种组件,filter之类的是交…

鸿蒙OS应用开发之显示图片组件6

前面学习了怎么样让图片合适的大小来显示出来,达到最佳的布局显示图片。现在来学习PixelMap图片显示。PixelMap图片是指图片解码后无压缩的位图,用于图片显示或图片处理。 由于PixelMap图片是一种无压缩的图片,比较适合图片处理,比如从网络上加载图片之后,再进行处理再显示…

STM32-启用蜂鸣器

目录 1 、电路构成及原理图 2、编写实现代码 main.c beep.c beep.h 3、代码讲解 4、 烧录到开发板调试、验证代码 5、检验效果 本人使用的是朗峰 STM32F103 系列开发板,此笔记基于这款开发板记录。 1 、电路构成及原理图 首先,通过朗峰 F1 开…

SpringBoot 3 新特性

目录 1. GraalVM2. 支持虚拟线程3. HTTP Interface 1. GraalVM 使用GraalVM将SpringBoot应用程序编译成本地可执行的镜像文件,可以显著提升启动速度、峰值性能以及减少内存应用。传统的应用都是编译成字节码,然后通过JVM解释并最终编译成机器码来运行&a…