unity 中使用zeroMq和Mqtt 进行通讯

最近我在做一个车上的HMI项目,也就是车机应用,需要与云端和域控进行通信。HMI的功能已经外包了,但消息的统一层留给我自己来做。因为项目组其他人都没有经验,所以这个任务就落到了我头上,尽管我自己也没有太多经验,但也没办法,只能直接上手了。大概架构如下

图片

1、整体设计

设计原则很简单,通过阻塞队列进行交互,因为要和第三方进行对接,这里通过topicbytes 进行交互,让他们自己转化 看下消息队列定义,也没啥,就是两个阻塞队列,通过队列和第三方交互。

public class MsgQueue 
{// 用于存储接收到的二进制消息private static BlockingCollection<Msg> recMessageQueue = new BlockingCollection<Msg>();private static BlockingCollection<Msg> sendMessageQueue = new BlockingCollection<Msg>();public static BlockingCollection<Msg> RecvMessageQueue{get { return recMessageQueue; }}public static BlockingCollection<Msg> SendMessageQueue{get { return sendMessageQueue; }}
}

2、zeroMq功能实现

这次的需求是和域控进行通信,主要使用发布订阅模式,也就是我本地需要一个client,一个server

2.1 zeroMq 介绍

第一次使用zeroMq ,稍微介绍下;ZeroMQ 是一个高性能的异步消息库,旨在简化分布式或多线程应用程序中的消息传递。它提供了一种灵活且高效的方式来进行数据交换,支持多种消息模式,能够在不同的进程、机器和网络之间进行通信。以下是 ZeroMQ 的一些关键特性和概念:ZeroMQ 支持多种消息模式,包括:

  • 请求-响应(Req-Rep):客户端发送请求,服务器处理并回复。

  • 发布-订阅(Pub-Sub):发布者发布消息,订阅者接收感兴趣的消息。

  • 推送-拉取(Push-Pull):用于分布式任务处理,推送端将任务发送到拉取端。

  • 管道(Pipeline):将多个组件连接起来形成数据处理管道。简单说就是一个TCP通信的框架,可以在本地作为客户端和服务器 官方网站:https://zeromq.org/languages/csharp/

2.2 插件介绍

zeromq的在C#上主要是通过netMq库,这玩意好多年不更新了 具体地址:https://github.com/zeromq/netmq 这玩意折腾了好久,第一次上手,主要要注意版本,通过Nuget 安装

Install-Package NetMQ

不知道为什么我安装不成功,unity里还是无法使用,我直接拷贝了dllplugins拷贝的时候注意依赖项,总共有三个,要不然会报错

图片

2.3 代码实现

using NetMQ;
using NetMQ.Sockets;
using System;
using System.Text;
using System.Threading.Tasks;
using UnityEngine;public class ZeroStarter : MonoBehaviour
{private SubscriberSocket subscriber;private PublisherSocket publisherSocket;private bool isRunning = true;void Start(){AsyncIO.ForceDotNet.Force(); // 确保 NetMQ 在 Unity 中正确工作// 初始化发布者publisherSocket = new PublisherSocket();publisherSocket.Bind("tcp://*:5557");subscriber = new SubscriberSocket();subscriber.Connect("tcp://localhost:5556");// 启动发布和订阅任务Task.Run(() => StartPub());Task.Run(() => StartSubscriber());}private void StartPub(){while (isRunning){try{// 使用 BlockingCollection 的 Take 方法获取消息Msg message = MsgQueue.SendMessageQueue.Take();publisherSocket.SendMoreFrame(message.Topic).SendFrame(message.Data);}catch (Exception e){Debug.LogError("Error in StartPub: " + e.Message);}}publisherSocket?.Close(); // 确保发布套接字在退出时关闭}private void StartSubscriber(){subscriber.Subscribe("");while (isRunning){try{// 通过超时和 TryReceiveFrameString 检查订阅消息if (subscriber.TryReceiveFrameString(out string topic)){byte[] bytes = subscriber.ReceiveFrameBytes();MsgQueue.RecvMessageQueue.Add(new Msg(topic, bytes));string str = Encoding.Default.GetString(bytes);Debug.Log($"{topic} {str}");}Task.Delay(10).Wait(); // 使用短暂的延迟来避免过度循环}catch (Exception e){Debug.LogError("Error in StartSubscriber: " + e.Message);}}subscriber.Close(); // 手动关闭订阅者套接字}private void OnDestroy(){isRunning = false; // 设置标志位,通知任务退出// 确保发布套接字关闭publisherSocket?.Close();NetMQConfig.Cleanup(); // 清理 NetMQ}
}

这里有一个注意点就是zeroMq 不支持topic, 但是支持 SendMoreFrame(message.Topic).SendFrame(message.Data) 这个有点风险,也很奇怪,但是貌似是官方推荐的做法,不纠结,就这样吧。

3、Mqtt功能实现

3.1 emqx介绍

mqtt也是一个消息队列,主要是用在IOT,简单来说就是一个TCP服务器,消息头会小一些。适合不稳定的网络。我们用的是Emqx ,还行,挺好用。工具客户端使用的是mqttx,下载地址:https://mqttx.app/zh 官方网站:https://www.emqx.com/zh

3.2 unity插件

这里使用的是官方推荐的插件库 sdk 介绍地址:https://docs.emqx.com/zh/emqx/latest/connect-emqx/introduction.html

官方示例:https://github.com/emqx/MQTT-Client-Examples

我这里使用的是:https://github.com/eclipse/paho.mqtt.m2mqtt 我也使用nuget安装了库,也不起作用,不知道为毛,盲猜是版本的问题,打开上面库,下载之后自己编译 因为用unity,所以打开了mono的库

图片

编译之后

图片

将生成的dll拷贝到plugins

3.3 代码实现

using System.Text;
using UnityEngine;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;public class MqttStarter : MonoBehaviour
{// Start is called before the first frame updatevoid Start(){string ipAddr = "192.168.3.8";int emqxPort = 1883;string clientId = "csharpclientid";//服务器默认密码是这个string username = "username";string password = "pwd";MqttClient client = new MqttClient(ipAddr, emqxPort,  false, null, null, MqttSslProtocols.None);// register to message receivedclient.MqttMsgPublishReceived += client_MqttMsgPublishReceived;client.Connect(clientId,username,password);// client.Subscribe(new string[] { "idse/cloud2veh/#" }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE });}private void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e){MsgQueue.RecvMessageQueue.Add(new Msg(e.Topic, e.Message));Debug.Log("mqtt--> " +e.Topic +"  ==== >" + Encoding.Default.GetString(e.Message));}
}

这里需要注意的是m2mqtt 会自动开启线程,不需要单独实现线程。其他的都是常规操作。

4、验证

先看下zeromq的收发

图片

验证下Mqtt的收发

图片

5、Java代码zeromq的使用

pom中加入

   <dependency><groupId>org.zeromq</groupId><artifactId>jeromq</artifactId><version>0.5.2</version></dependency>

zeromq 发布者

package org.example;/*** Hello world!**/
import org.zeromq.ZMQ;
import org.zeromq.ZContext;public class App {public static void main(String[] args) {int i = 0;// 创建一个ZeroMQ的上下文try (ZContext context = new ZContext()) {// 创建一个发布者socketZMQ.Socket publisher = context.createSocket(ZMQ.PUB);// 绑定到指定的端口String address = "tcp://*:5556";publisher.bind(address);System.out.println("Publisher started at " + address);// 持续发送消息int messageNumber = 0;while (!Thread.currentThread().isInterrupted()) {String message = "Message " + messageNumber;publisher.sendMore("t:"+ (messageNumber%2));publisher.send("abc".getBytes());System.out.println("Sent: " + message);// 增加消息计数并休眠1秒messageNumber++;Thread.sleep(1000);}// 关闭发布者socketpublisher.close();} catch (Exception e) {e.printStackTrace();}}
}

zeromq 订阅者

package org.example;import org.zeromq.ZMQ;
import org.zeromq.ZContext;import java.nio.ByteBuffer;public class ZmqSubscriber {public static void main(String[] args) {// 创建一个ZeroMQ的上下文try (ZContext context = new ZContext()) {// 创建一个订阅者socketZMQ.Socket subscriber = context.createSocket(ZMQ.SUB);// 连接到发布者的地址String address = "tcp://localhost:5557"; // 确保使用正确的地址subscriber.connect(address);System.out.println("Subscriber connected to " + address);// 订阅所有消息subscriber.subscribe("".getBytes()); // 订阅所有消息,可以根据需要指定主题// 持续接收消息while (!Thread.currentThread().isInterrupted()) {// 接收消息String topic = subscriber.recvStr(0);byte[] message = subscriber.recv(0);ByteBuffer wrap = ByteBuffer.wrap(message);if (message != null) {System.out.println("Received: " + topic +"  " + wrap.order(java.nio.ByteOrder.LITTLE_ENDIAN).getInt());}}// 关闭订阅者socketsubscriber.close();} catch (Exception e) {e.printStackTrace();}}
}

6、总结

第一次搞zeromq的消息队列,和平常用的kafkarocketmq 差的很多,甚至完全不在同一个讨论方向。

还有一些需要研究

  • unitynuget的学习

  • c# 不同平台的学习 

  • 启动后台线程之后无法关闭,导致unity死掉。

byte[] bytes = BitConverter.GetBytes(66666);
  • C#侧生成的bytes 是小端序列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/58645.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

掌握ElasticSearch(六):分析过程

文章目录 一、什么是分析1. 字符过滤 (Character Filtering)2. 分词 (Breaking into Tokens)3. 词条过滤 (Token Filtering)4. 词条索引 (Token Indexing) 二、内置分析器分类1. 标准分析器 (Standard Analyzer)2. 简单分析器 (Simple Analyzer)3. 语言分析器 (Language Analyz…

LabVIEW Modbus通讯稳定性提升

在LabVIEW开发Modbus通讯程序时&#xff0c;通讯不稳定是一个常见问题&#xff0c;可能导致数据丢失、延迟或错误。为了确保通讯的可靠性&#xff0c;可以从多个角度进行优化&#xff0c;以下是一些有效的解决方案&#xff0c;结合实际案例进行分析。 1. 优化通讯参数设置 通讯…

数据库安装使用

文章目录 一、目的二、原理三、过程 一、目的 Mysql数据库的解压Mysql数据库的的配置Mysql数据库的使用 二、原理 MySQL是一个关系型数据库管理系统&#xff0c;在 WEB 应用方面&#xff0c;MySQL是最好的RDBMS (Relational Database Management System&#xff0c;关系数据…

实时数据处理:技术支持和优势

在当今快节奏的数字世界中&#xff0c;企业不断寻找在竞争中保持领先地位的方法。批量和近实时的数据处理方法已经无法满足企业对于数据处理速度要求了。因此实时数据处理出现&#xff0c;逐渐帮助企业获取更快速的决策能力。本文&#xff0c;我们将深入研究实时处理&#xff0…

HCIP--以太网交换安全(总实验)

实验背景 假如你是公司的网络管理员&#xff0c;为了提高公司网络安全性&#xff0c;你决定在接入交换机部署一些安全技术&#xff1a;端口隔、端口安全、DHCP snooping、IPSG。 实验拓扑图 实验的要求&#xff1a; 1.在R1、R2连接在GE0/0/1和GE0/0/2接口下&#xff0c;均划…

高考新出路,综合评价招生全攻略,让你低分高就进名校

在高考改革的浪潮中&#xff0c;综合评价招生模式逐渐兴起&#xff0c;成为众多考生和家长关注的焦点。那么&#xff0c;什么是综合评价呢&#xff1f;简单来说&#xff0c;就是综合考量高考成绩、高校考核结果、高中学业水平考试成绩、综合素质评价以及其他特殊要求等五个维度…

一个图像处理的实验设计

在Rafael Gonzalez和Richard Woods的《数字图像处理》中有一道这样的实验设计题&#xff0c;我发现特别适合说明多个阈值的全局阈值分割的示例。 我嫌他说话啰嗦&#xff0c;修改了一下作为考试题。 基本流程 图像分割 选取中间灰度级的区域标记。 2. 形态学后处理 开运…

【Pandas】使用 pandas 处理多层级 Excel 表头的实用指南

背景 在数据分析和业务报告中&#xff0c;Excel 文件广泛用于存储和展示信息&#xff0c;特别是在需要对数据进行分类和分组时。许多商业场景中&#xff0c;数据会以多层级表头的形式呈现&#xff0c;体现不同维度的信息。例如&#xff0c;在财务报表中&#xff0c;可能按年度…

【YApi】接口管理平台

一、简介 YApi 是一个用于前后端开发团队协作的 API 管理平台&#xff0c;帮助团队更加高效地进行 API 接口的设计、测试、文档管理和版本控制等工作。 YApi 主要功能&#xff1a; API 设计和管理&#xff1a;提供 API 设计和文档生成工具&#xff0c;使开发者能够轻松创建、…

ctfshow(159->162)--文件上传漏洞

Web159 考点&#xff1a; 前端校验MIME验证黑名单 思路&#xff1a; 上传.user.ini文件&#xff1a; 文件内容auto_prepend_fileshell.png 由于网页存在前端过滤&#xff0c;只允许上传.png文件,所以我们将文件名修改为.user.ini.png上传&#xff0c;然后抓包删除.png后缀名…

使用pytest单元测试框架执行单元测试

Pytest 是一个功能强大且灵活的 Python 单元测试框架&#xff0c;它使编写、组织和运行测试变得更加简单。以下是 Pytest 的一些主要特点和优点&#xff1a; 简单易用&#xff1a;Pytest 提供了简洁而直观的语法&#xff0c;使编写测试用例变得非常容易。它支持使用 assert 语…

可视化平台FineBI的安装及简单使用

本章知识简介 主线B: 安装FineBI 主线C: FineBI的使用. 本章目标&#xff1a; 1:了解FineBI的安装; [了解]"傻瓜式"安装 2:知道FineBI的使用流程; [了解]a.连接数据b.创建组件与分析数据c.可视化组件d.添加交互效果e.制作仪表板 3:知道如何连接数据; [重点]添加业…

推荐一款优秀的pdf编辑器:Ashampoo PDF Pro

Ashampoo PDF Pro是管理和编辑 PDF 文档的完整解决方案。程序拥有您创建、转换、编辑和保护文档所需的一切功能。根据需要可以创建特定大小的文档&#xff0c;跨设备可读&#xff0c;还可以保护文件。现在您还能像编辑Word文档一样编辑PDF! 软件特点 轻松处理文字 如 Microso…

云原生后端开发教程

云原生后端开发教程 引言 随着云计算的普及&#xff0c;云原生架构逐渐成为现代软件开发的主流。云原生不仅仅是将应用部署到云上&#xff0c;而是一种构建和运行应用的方式&#xff0c;充分利用云计算的弹性和灵活性。本文将深入探讨云原生后端开发的核心概念、工具和实践&a…

XQT_UI 组件|02| 按钮 XPushButton

XPushButton 使用文档 简介 XPushButton 是一个自定义的按钮类&#xff0c;基于 Qt 框架构建&#xff0c;提供了丰富的样式和功能选项。它允许开发者轻松创建具有不同外观和行为的按钮&#xff0c;以满足用户界面的需求。 特性 颜色设置&#xff1a;支持多种颜色选择。样式设…

Linux线程安全(二)条件变量实现线程同步

目录 条件变量 条件变量初始化和唤醒 键盘触发条件变量唤醒线程demo 条件变量的等待 条件变量定时等待demo 条线变量实现多线程间的同步 条件变量 条件变量是为了控制多个线程的同步工作而设计的 比如说一个系统中有多个线程的存在但有且仅有一个线程在工作&#xff0c…

python的lambda实用技巧

lambda表达式 lambda表达式是一种简化的函数表现形式&#xff0c;也叫匿名函数&#xff0c;可以存在函数名也可以不存在。 使用一行代码就可以表示一个函数&#xff1a; # 格式 lambda arg[参数] : exp[表现形式] # 无参写法 lambda : "hello" # 一般写法 lambda …

复现第一周24

1.[SWPUCTF 2021 新生赛]gift_F12 1&#xff09;打开题目 2&#xff09;看源码 3&#xff09;直接ctrl&#xff0b;f搜索flag 2.[SWPUCTF 2021 新生赛]nc签到 1&#xff09;开题 2&#xff09;下载附件用记事本打开 3&#xff09;打开kali使用nc连接代码 输入l\s命令绕过黑名…

开发流程初学者指南——需求分析

目录 从零开始理解需求分析什么是需求分析&#xff1f;需求分析的目标需求分析的基本原则需求分析的各个阶段需求分析的常用方法和工具编写需求文档总结 从零开始理解需求分析 需求分析是软件开发过程中不可或缺的一环&#xff0c;它帮助我们明确用户的需求&#xff0c;确保最…