最近我在做一个车上的HMI项目,也就是车机应用,需要与云端和域控进行通信。HMI的功能已经外包了,但消息的统一层留给我自己来做。因为项目组其他人都没有经验,所以这个任务就落到了我头上,尽管我自己也没有太多经验,但也没办法,只能直接上手了。大概架构如下
1、整体设计
设计原则很简单,通过阻塞队列进行交互,因为要和第三方进行对接,这里通过topic
和 bytes
进行交互,让他们自己转化 看下消息队列定义,也没啥,就是两个阻塞队列,通过队列和第三方交互。
public class MsgQueue
{// 用于存储接收到的二进制消息private static BlockingCollection<Msg> recMessageQueue = new BlockingCollection<Msg>();private static BlockingCollection<Msg> sendMessageQueue = new BlockingCollection<Msg>();public static BlockingCollection<Msg> RecvMessageQueue{get { return recMessageQueue; }}public static BlockingCollection<Msg> SendMessageQueue{get { return sendMessageQueue; }}
}
2、zeroMq功能实现
这次的需求是和域控进行通信,主要使用发布订阅模式,也就是我本地需要一个client
,一个server
。
2.1 zeroMq 介绍
第一次使用zeroMq
,稍微介绍下;ZeroMQ
是一个高性能的异步消息库,旨在简化分布式或多线程应用程序中的消息传递。它提供了一种灵活且高效的方式来进行数据交换,支持多种消息模式,能够在不同的进程、机器和网络之间进行通信。以下是 ZeroMQ
的一些关键特性和概念:ZeroMQ
支持多种消息模式,包括:
-
请求-响应(Req-Rep):客户端发送请求,服务器处理并回复。
-
发布-订阅(Pub-Sub):发布者发布消息,订阅者接收感兴趣的消息。
-
推送-拉取(Push-Pull):用于分布式任务处理,推送端将任务发送到拉取端。
-
管道(Pipeline):将多个组件连接起来形成数据处理管道。简单说就是一个
TCP
通信的框架,可以在本地作为客户端和服务器 官方网站:https://zeromq.org/languages/csharp/
2.2 插件介绍
zeromq
的在C#
上主要是通过netMq
库,这玩意好多年不更新了 具体地址:https://github.com/zeromq/netmq 这玩意折腾了好久,第一次上手,主要要注意版本,通过Nuget 安装
Install-Package NetMQ
不知道为什么我安装不成功,unity里还是无法使用,我直接拷贝了dll
到plugins
拷贝的时候注意依赖项,总共有三个,要不然会报错
2.3 代码实现
using NetMQ;
using NetMQ.Sockets;
using System;
using System.Text;
using System.Threading.Tasks;
using UnityEngine;public class ZeroStarter : MonoBehaviour
{private SubscriberSocket subscriber;private PublisherSocket publisherSocket;private bool isRunning = true;void Start(){AsyncIO.ForceDotNet.Force(); // 确保 NetMQ 在 Unity 中正确工作// 初始化发布者publisherSocket = new PublisherSocket();publisherSocket.Bind("tcp://*:5557");subscriber = new SubscriberSocket();subscriber.Connect("tcp://localhost:5556");// 启动发布和订阅任务Task.Run(() => StartPub());Task.Run(() => StartSubscriber());}private void StartPub(){while (isRunning){try{// 使用 BlockingCollection 的 Take 方法获取消息Msg message = MsgQueue.SendMessageQueue.Take();publisherSocket.SendMoreFrame(message.Topic).SendFrame(message.Data);}catch (Exception e){Debug.LogError("Error in StartPub: " + e.Message);}}publisherSocket?.Close(); // 确保发布套接字在退出时关闭}private void StartSubscriber(){subscriber.Subscribe("");while (isRunning){try{// 通过超时和 TryReceiveFrameString 检查订阅消息if (subscriber.TryReceiveFrameString(out string topic)){byte[] bytes = subscriber.ReceiveFrameBytes();MsgQueue.RecvMessageQueue.Add(new Msg(topic, bytes));string str = Encoding.Default.GetString(bytes);Debug.Log($"{topic} {str}");}Task.Delay(10).Wait(); // 使用短暂的延迟来避免过度循环}catch (Exception e){Debug.LogError("Error in StartSubscriber: " + e.Message);}}subscriber.Close(); // 手动关闭订阅者套接字}private void OnDestroy(){isRunning = false; // 设置标志位,通知任务退出// 确保发布套接字关闭publisherSocket?.Close();NetMQConfig.Cleanup(); // 清理 NetMQ}
}
这里有一个注意点就是zeroMq 不支持topic, 但是支持 SendMoreFrame(message.Topic).SendFrame(message.Data) 这个有点风险,也很奇怪,但是貌似是官方推荐的做法,不纠结,就这样吧。
3、Mqtt功能实现
3.1 emqx介绍
mqtt也是一个消息队列,主要是用在IOT
,简单来说就是一个TCP
服务器,消息头会小一些。适合不稳定的网络。我们用的是Emqx
,还行,挺好用。工具客户端使用的是mqttx
,下载地址:https://mqttx.app/zh 官方网站:https://www.emqx.com/zh
3.2 unity插件
这里使用的是官方推荐的插件库 sdk 介绍地址:https://docs.emqx.com/zh/emqx/latest/connect-emqx/introduction.html
官方示例:https://github.com/emqx/MQTT-Client-Examples
我这里使用的是:https://github.com/eclipse/paho.mqtt.m2mqtt 我也使用nuget安装了库,也不起作用,不知道为毛,盲猜是版本的问题,打开上面库,下载之后自己编译 因为用unity
,所以打开了mono
的库
编译之后
将生成的dll
拷贝到plugins
下
3.3 代码实现
using System.Text;
using UnityEngine;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;public class MqttStarter : MonoBehaviour
{// Start is called before the first frame updatevoid Start(){string ipAddr = "192.168.3.8";int emqxPort = 1883;string clientId = "csharpclientid";//服务器默认密码是这个string username = "username";string password = "pwd";MqttClient client = new MqttClient(ipAddr, emqxPort, false, null, null, MqttSslProtocols.None);// register to message receivedclient.MqttMsgPublishReceived += client_MqttMsgPublishReceived;client.Connect(clientId,username,password);// client.Subscribe(new string[] { "idse/cloud2veh/#" }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE });}private void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e){MsgQueue.RecvMessageQueue.Add(new Msg(e.Topic, e.Message));Debug.Log("mqtt--> " +e.Topic +" ==== >" + Encoding.Default.GetString(e.Message));}
}
这里需要注意的是m2mqtt
会自动开启线程,不需要单独实现线程。其他的都是常规操作。
4、验证
先看下zeromq
的收发
验证下Mqtt的收发
5、Java代码zeromq的使用
pom中加入
<dependency><groupId>org.zeromq</groupId><artifactId>jeromq</artifactId><version>0.5.2</version></dependency>
zeromq 发布者
package org.example;/*** Hello world!**/
import org.zeromq.ZMQ;
import org.zeromq.ZContext;public class App {public static void main(String[] args) {int i = 0;// 创建一个ZeroMQ的上下文try (ZContext context = new ZContext()) {// 创建一个发布者socketZMQ.Socket publisher = context.createSocket(ZMQ.PUB);// 绑定到指定的端口String address = "tcp://*:5556";publisher.bind(address);System.out.println("Publisher started at " + address);// 持续发送消息int messageNumber = 0;while (!Thread.currentThread().isInterrupted()) {String message = "Message " + messageNumber;publisher.sendMore("t:"+ (messageNumber%2));publisher.send("abc".getBytes());System.out.println("Sent: " + message);// 增加消息计数并休眠1秒messageNumber++;Thread.sleep(1000);}// 关闭发布者socketpublisher.close();} catch (Exception e) {e.printStackTrace();}}
}
zeromq 订阅者
package org.example;import org.zeromq.ZMQ;
import org.zeromq.ZContext;import java.nio.ByteBuffer;public class ZmqSubscriber {public static void main(String[] args) {// 创建一个ZeroMQ的上下文try (ZContext context = new ZContext()) {// 创建一个订阅者socketZMQ.Socket subscriber = context.createSocket(ZMQ.SUB);// 连接到发布者的地址String address = "tcp://localhost:5557"; // 确保使用正确的地址subscriber.connect(address);System.out.println("Subscriber connected to " + address);// 订阅所有消息subscriber.subscribe("".getBytes()); // 订阅所有消息,可以根据需要指定主题// 持续接收消息while (!Thread.currentThread().isInterrupted()) {// 接收消息String topic = subscriber.recvStr(0);byte[] message = subscriber.recv(0);ByteBuffer wrap = ByteBuffer.wrap(message);if (message != null) {System.out.println("Received: " + topic +" " + wrap.order(java.nio.ByteOrder.LITTLE_ENDIAN).getInt());}}// 关闭订阅者socketsubscriber.close();} catch (Exception e) {e.printStackTrace();}}
}
6、总结
第一次搞zeromq
的消息队列,和平常用的kafka
和rocketmq
差的很多,甚至完全不在同一个讨论方向。
还有一些需要研究
-
在
unity
中nuget
的学习 -
c#
不同平台的学习 -
启动后台线程之后无法关闭,导致
unity
死掉。
byte[] bytes = BitConverter.GetBytes(66666);
-
C#侧生成的bytes 是小端序列