已经有很长一段时间没有写代码,为了不让自己的代码技能有所下降所以针对BeetleX扩展了一个MQTT协议来保持自己的代码设计和编写能力。接下来简单介绍一下如何使用BeetleX.MQTT来构建对应的TCP或WebSocket服务。
以下实现是针对MQTT 3.1.1版本,协议的实现也并不复杂就不介绍了,可以通过关注 https://github.com/beetlex-io/mqtt了解具体代码。接下来分享使用BeetleX.MQTT实现TCP和Websocket服务
TCP服务
using BeetleX.MQTT.Messages;
using System;
using System.Collections.Generic;
using System.Text;
using System.Linq;
namespace BeetleX.MQTT.Server
{class Program{private static ServerBuilder<MQTTApplication, MQTTUser, MQTTPacket> server;static void Main(string[] args){server = new ServerBuilder<MQTTApplication, MQTTUser, MQTTPacket>();server.ConsoleOutputLog = true;server.SetOptions(option =>{option.DefaultListen.Port = 9090;option.DefaultListen.Host = "127.0.0.1";option.LogLevel = EventArgs.LogType.Trace;}).OnMessageReceive<CONNECT>(e =>{e.GetLoger(EventArgs.LogType.Info)?.Log(EventArgs.LogType.Info, e.NetSession, $"{e.NetSession.RemoteEndPoint} connect name:{e.Message.UserName} password:{e.Message.Password}");e.Session.UserName = e.Message.UserName;e.Session.ID = e.Message.ClientID;CONNACK ack = new CONNACK();e.Return(ack);}).OnMessageReceive<SUBSCRIBE>(e =>{e.GetLoger(EventArgs.LogType.Info)?.Log(EventArgs.LogType.Info, e.NetSession, $"{e.Session.ID} subscribe {e.Message}");SUBACK ack = new SUBACK();ack.Identifier = e.Message.Identifier;ack.Status = QoSType.MostOnce;e.Return(ack);e.Application.RegisterSubscribe(e.Message, e.Session);}).OnMessageReceive<UNSUBSCRIBE>(e =>{e.GetLoger(EventArgs.LogType.Info)?.Log(EventArgs.LogType.Info, e.NetSession, $"{e.Session.ID} unsubscribe {e.Message}");UNSUBACK ack = new UNSUBACK();e.Return(ack);e.Application.UnRegisterSubscribe(e.Message, e.Session);}).OnMessageReceive<PUBLISH>(e =>{var data = Encoding.UTF8.GetString(e.Message.PayLoadData.Array, e.Message.PayLoadData.Offset, e.Message.PayLoadData.Count);e.GetLoger(EventArgs.LogType.Info)?.Log(EventArgs.LogType.Info, e.NetSession, $"{e.Session.ID} publish {e.Message.Topic}@ {e.Message.Identifier} data:{data}");PUBACK ack = new PUBACK();ack.Identifier = e.Message.Identifier;e.Return(ack);e.Application.Publish(e.Message);}).OnMessageReceive<PINGREQ>(e =>{PINGRESP resp = new PINGRESP();e.Return(resp);}).OnMessageReceive(e =>{}).Run();Console.Read();}}}
WebSocket服务
using BeetleX.MQTT.Messages;
using System;
using System.Text;namespace BeetleX.MQTT.WSServer
{class Program{private static MQTTWebsocketServer<MQTTApplication, MQTTUser> mServer;static void Main(string[] args){mServer = new MQTTWebsocketServer<MQTTApplication, MQTTUser>(8081);mServer.Setting((service, options) => {options.LogLevel = EventArgs.LogType.Trace;options.LogToConsole = true;options.WebSocketFrameSerializer = new MQTTFormater();});mServer.OnMessageReceive<CONNECT>(e =>{e.GetLoger(EventArgs.LogType.Info)?.Log(EventArgs.LogType.Info, e.NetSession, $"{e.NetSession.RemoteEndPoint} connect name:{e.Message.UserName} password:{e.Message.Password}");e.Session.UserName = e.Message.UserName;e.Session.ID = e.Message.ClientID;CONNACK ack = new CONNACK();e.Return(ack);}).OnMessageReceive<SUBSCRIBE>(e =>{e.GetLoger(EventArgs.LogType.Info)?.Log(EventArgs.LogType.Info, e.NetSession, $"{e.Session.ID} subscribe {e.Message}");SUBACK ack = new SUBACK();ack.Identifier = e.Message.Identifier;ack.Status = QoSType.MostOnce;e.Return(ack);e.Application.RegisterSubscribe(e.Message, e.Session);}).OnMessageReceive<UNSUBSCRIBE>(e =>{e.GetLoger(EventArgs.LogType.Info)?.Log(EventArgs.LogType.Info, e.NetSession, $"{e.Session.ID} unsubscribe {e.Message}");UNSUBACK ack = new UNSUBACK();e.Return(ack);e.Application.UnRegisterSubscribe(e.Message, e.Session);}).OnMessageReceive<PUBLISH>(e =>{var data = Encoding.UTF8.GetString(e.Message.PayLoadData.Array, e.Message.PayLoadData.Offset, e.Message.PayLoadData.Count);e.GetLoger(EventArgs.LogType.Info)?.Log(EventArgs.LogType.Info, e.NetSession, $"{e.Session.ID} publish {e.Message.Topic}@ {e.Message.Identifier} data:{data}");PUBACK ack = new PUBACK();ack.Identifier = e.Message.Identifier;e.Return(ack);e.Application.Publish(e.Message);}).OnMessageReceive<PINGREQ>(e =>{PINGRESP resp = new PINGRESP();e.Return(resp);}).OnMessageReceive(e =>{}).Run();Console.Read();}}
}
BeetleX
开源跨平台通讯框架(支持TLS)
提供HTTP,Websocket,MQTT,Redis,RPC和服务网关开源组件
https://beetlex-io.com