如何在网页端实现MQTT消息的发布和订阅?
- 实现MQTT功能,可以发布和订阅主题
- 通过WebSocket协议将MQTT消息转发给对应的网页端
带着这个实现思路,采用C#控制台程序实现MQTT服务端功能,web端可以直接使用websocket插件与服务端双向通讯。
- 新建C#控制台程序,.net framework框架版本4.5.2
- 引用Fleck框架,版本1.2.0
- 引用MQTTnet框架,版本4.3.1.873
1、WebSocket功能实现
socketDic 字典key值记录每个客户端,value记录每个客户端对应的SN,代表设备的序列号;
每当有网页端客户端连接的时候,自动将客户端套接字加入字典中;
每当网页关闭,服务端自动断开客户端套接字连接,并从缓存字典中移除此客户端套接字;
当网页端客户端套接字发送包含SN:开头的字符,表示网页端发送了设备的SN,服务端接收此消息,并解析设备SN,并与对应的客户端套接字绑定对应关系,在缓存字典中维护对应关系。
class Server
{static Dictionary<IWebSocketConnection, string> socketDic = new Dictionary<IWebSocketConnection, string>();static void Main(){FleckLog.Level = LogLevel.Debug;var server = new WebSocketServer("ws://0.0.0.0:8181");//var socketDic = new Dictionary<IWebSocketConnection, string>();server.Start(socket =>{socket.OnOpen = () =>{Console.WriteLine("Open!");socketDic.Add(socket, null);};socket.OnClose = () =>{Console.WriteLine("Close!");socketDic.Remove(socket);};socket.OnMessage = message =>{if (message.Contains("SN:") && socketDic.ContainsKey(socket)){socketDic[socket] = message.Replace("SN:", "");}};});Client();var input = Console.ReadLine();while (input != "exit"){foreach (var socket in socketDic.ToList()){socket.Key.Send(input);}input = Console.ReadLine();}}
}
2、MQTT功能实现
MQTT服务器采用EMQX公共服务器测试,服务器域名:broker.emqx.io,端口:1883,用户名:emqx_test,密码:emqx_test
订阅主题名:123、12345
MQTT接收消息事件ApplicationMessageReceivedAsync,判断消息主题与缓存字典中的套接字是否对应,如消息主题与缓存字典SN存在包含关系,则认为此消息与网页端套接字存在关联关系,则将消息推送给此套接字。
具体实现代码:
static IMqttClient client;public static async void Client(){try{client = new MqttFactory().CreateMqttClient() as MqttClient;var build = new MqttClientOptionsBuilder().WithClientId(Guid.NewGuid().ToString().Replace("-", "").ToUpper()).WithCredentials("emqx_test", "emqx_test").WithTcpServer("broker.emqx.io", 1883).WithCleanSession(true).WithKeepAlivePeriod(TimeSpan.FromSeconds(100.5));client.ConnectedAsync += _mqttClient_ConnectedAsync;await client.ConnectAsync(build.Build());client.DisconnectedAsync += _mqttClient_DisconnectedAsync;client.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync;await client.SubscribeAsync(new MqttClientSubscribeOptions{TopicFilters = new List<MqttTopicFilter> {new MqttTopicFilter() //订阅消息对象{Topic = "123", //订阅消息主题QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce //消息类型},new MqttTopicFilter() //订阅消息对象{Topic = "12345", //订阅消息主题QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce //消息类型}}});}catch (Exception e){Console.WriteLine($"连接失败");}}private static Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg){var msg = arg.ApplicationMessage.ConvertPayloadToString();foreach (var item in socketDic){if (item.Value!=null && arg.ApplicationMessage.Topic.Contains(item.Value)){item.Key.Send($"hello {item.Value},{msg}");}}Console.WriteLine(arg.ApplicationMessage.ConvertPayloadToString());return Task.FromResult("");}private static Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg){Console.WriteLine($"客户端“{client.Options.ClientId}”已断开MQTT服务器!");return Task.FromResult("");}private static Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg){Console.WriteLine($"客户端“{client.Options.ClientId}”已连接MQTT服务器!");return Task.FromResult("");}
3、网页端实现
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN">
<html>
<head><title>websocket client</title><script type="text/javascript">var start = function () {var inc = document.getElementById('incomming');var wsImpl = window.WebSocket || window.MozWebSocket;var form = document.getElementById('sendForm');var input = document.getElementById('sendText');inc.innerHTML += "connecting to server ..<br/>";// create a new websocket and connectwindow.ws = new wsImpl('ws://localhost:8181/');// when data is comming from the server, this metod is calledws.onmessage = function (evt) {inc.innerHTML += evt.data + '<br/>';};// when the connection is established, this method is calledws.onopen = function () {inc.innerHTML += '.. connection open<br/>';ws.send("SN:12345");};// when the connection is closed, this method is calledws.onclose = function () {inc.innerHTML += '.. connection closed<br/>';}//form.addEventListener('submit', function(e){// e.preventDefault();// var val = input.value;// ws.send(val);// input.value = "";//}); }window.onload = start;</script>
</head>
<body><form id="sendForm"><input id="sendText" placeholder="Text to send" /></form><pre id="incomming"></pre>
</body>
</html>
4、测试
启动WebSocket服务器控制台应用程序;
运行MQTT.fx工具,新建连接
给12345主题发布消息,
观察网页端是否收到消息,