.Net Core/.net 6/.Net 8 实现Mqtt服务端
- Mqtt服务端代码
- `IMqttServer` 接口
- 业务类,实现 `IMqttServer` 接口
- `Program.cs`
直接上代码
nuget 引用
MQTTnet
Mqtt服务端代码
using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;namespace Code.Mqtt
{/// <summary>/// mqtt服务端/// </summary>public class MqttServerBase{public MqttServer _server;readonly IMqttServer _mqttServer;/// <summary>/// 向指定主题发送消息/// </summary>public Action<string, string> ToTopic;/// <summary>/// 主题/客户端列表/// </summary>public Dictionary<string,List<string>> Topic_Client=new Dictionary<string, List<string>>();public MqttServerBase(IMqttServer mqttServer){_mqttServer = mqttServer;if(mqttServer == null){throw new Exception("MqttServer配置错误");}var optionbuilder = new MqttServerOptionsBuilder().WithDefaultEndpoint()//设置默认地址127.0.0.1.WithDefaultEndpointPort(_mqttServer.Port);//1883_server = new MqttFactory().CreateMqttServer(optionbuilder.Build());ToTopic = (topic, msg) => {_server.InjectApplicationMessage(new InjectedMqttApplicationMessage(new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(msg).Build()));};_server.ClientConnectedAsync += (e) =>{_mqttServer.ClientConnectedAsync(e.ClientId, e);return Task.CompletedTask;};_server.ClientDisconnectedAsync += (e) => {_mqttServer.ClientDisconnectedAsync(e.ClientId, e);return Task.CompletedTask;};_server.InterceptingPublishAsync += (e)=> {var msg = e.ApplicationMessage?.PayloadSegment.Array?.BToString();var Topic = e.ApplicationMessage.Topic;//判断主题是否存在if (Topic_Client.ContainsKey(Topic)){_mqttServer.InterceptingPublishAsync(e.ClientId, Topic, msg, e, ToTopic);}return Task.CompletedTask;};_server.ApplicationMessageNotConsumedAsync += (e) => {var Topic = e.ApplicationMessage.Topic;var msg = e.ApplicationMessage.PayloadSegment.Array.BToString();//判断主题是否存在,否则会进入死循环if (Topic_Client.ContainsKey(Topic)){_mqttServer.ApplicationMessageNotConsumedAsync(Topic, msg, e);}return Task.CompletedTask;};_server.ValidatingConnectionAsync += (e) => {if (_mqttServer.ValidatingConnectionAsync(e.UserName, e.Password,e.ClientId, e)){e.ReasonCode = MqttConnectReasonCode.Success;//验证通过}else{e.ReasonCode = MqttConnectReasonCode.Banned;//验证不通过}return Task.CompletedTask;};//订阅主题_server.ClientSubscribedTopicAsync += (e) =>{var _topic = e.TopicFilter.Topic;//保存主题if (!Topic_Client.ContainsKey(_topic)){Topic_Client.Add(_topic, new List<string>());}//添加订阅主题的客户端if (!Topic_Client[_topic].Any(x=>x== e.ClientId)){Topic_Client[_topic].Add(e.ClientId);}_mqttServer.ClientSubscribedTopicAsync(e.ClientId, _topic, e);return Task.CompletedTask;};//取消订阅_server.ClientUnsubscribedTopicAsync += (e) =>{var _topic = e.TopicFilter;//移除客户端if (!Topic_Client.ContainsKey(_topic)){Topic_Client[_topic].Remove(e.ClientId);if (Topic_Client[_topic].Count == 0){// 移除没有客户端订阅的主题Topic_Client.Remove(_topic);}_mqttServer.ClientUnsubscribedTopicAsync(e.ClientId, e.TopicFilter, e);}return Task.CompletedTask;};//服务启动事件_server.StartedAsync += _mqttServer.StartedAsync;//服务停止事件_server.StoppedAsync += _mqttServer.StoppedAsync;Start();}public async Task Start(){Console.WriteLine("正在启动Mqtt服务");await _server.StartAsync();Console.WriteLine("Mqtt服务启动成功,端口:" + _mqttServer.Port);}public async Task Stop(){Console.WriteLine("正在停止Mqtt服务");await _server.StopAsync();Console.WriteLine("Mqtt服务停止");}/// <summary>/// 重启服务/// </summary>/// <returns></returns>public async Task ReStart(){await Stop();await Start();}}
}
IMqttServer
接口
using MQTTnet.Server;namespace Code.Mqtt
{public interface IMqttServer{/// <summary>/// 服务端口/// </summary>int Port { get;}/// <summary>/// 服务启动事件/// </summary>/// <param name="args"></param>/// <returns></returns>public Task StartedAsync(EventArgs args);/// <summary>/// 服务停止事件/// </summary>/// <param name="args"></param>/// <returns></returns>public Task StoppedAsync(EventArgs args);/// <summary>/// 客户端上线/// </summary>/// <param name="args"></param>/// <returns></returns>public Task ClientConnectedAsync(string clientId,ClientConnectedEventArgs args);/// <summary>/// 客户端下线/// </summary>/// <param name="args"></param>/// <returns></returns>public Task ClientDisconnectedAsync(string clientId,ClientDisconnectedEventArgs args);/// <summary>/// 消息事件/// </summary>/// <param name="args"></param>/// <param name="ToTopic">发送消息</param>/// <returns></returns>public Task InterceptingPublishAsync(string clientId,string Topic,string msg,InterceptingPublishEventArgs args, Action<string, string> ToTopic);/// <summary>/// 验证/// </summary>/// <param name="username">账号</param>/// <param name="password">密码</param>/// <param name="args"></param>/// <returns></returns>public bool ValidatingConnectionAsync(string username,string password,string clientId,ValidatingConnectionEventArgs args);/// <summary>/// 消息未消费事件/// </summary>/// <param name="args"></param>/// <returns></returns>public Task ApplicationMessageNotConsumedAsync(string Topic,string msg,ApplicationMessageNotConsumedEventArgs args);/// <summary>/// 订阅主题事件/// </summary>/// <param name="args"></param>/// <returns></returns>public Task ClientSubscribedTopicAsync(string clientId,string Topic,ClientSubscribedTopicEventArgs args);/// <summary>/// 取消订阅主题事件/// </summary>/// <param name="args"></param>/// <returns></returns>public Task ClientUnsubscribedTopicAsync(string clientId,string Topic,ClientUnsubscribedTopicEventArgs args);}
}
业务类,实现 IMqttServer
接口
public class MqttApp : IMqttServer{/// <summary>/// 服务端口/// </summary>int IMqttServer.Port { get => 10883; }public MqttApp(){}/// <summary>/// 消息未消费/// </summary>/// <param name="Topic">主题</param>/// <param name="msg">消息内容</param>/// <param name="args">事件原始参数</param>/// <returns></returns>async Task IMqttServer.ApplicationMessageNotConsumedAsync(string Topic,string msg,ApplicationMessageNotConsumedEventArgs args){Console.WriteLine($"消息未消费{Topic}:");Console.WriteLine(msg);}/// <summary>/// 客户端上线/// </summary>/// <param name="clientId">客户端id</param>/// <param name="args">事件原始参数</param>/// <returns></returns>async Task IMqttServer.ClientConnectedAsync(string clientId, ClientConnectedEventArgs args){Console.WriteLine($"客户端上线 id:{clientId}");}/// <summary>/// 客户端下线/// </summary>/// <param name="clientId">客户端id</param>/// <param name="args">事件原始参数</param>/// <returns></returns>async Task IMqttServer.ClientDisconnectedAsync(string clientId, ClientDisconnectedEventArgs args){Console.WriteLine($"客户端下线 id:{clientId}");}/// <summary>/// 订阅主题/// </summary>/// <param name="clientId">客户端id</param>/// <param name="Topic">主题</param>/// <param name="args">事件原始参数</param>/// <returns></returns>async Task IMqttServer.ClientSubscribedTopicAsync(string clientId, string Topic, ClientSubscribedTopicEventArgs args){Console.WriteLine($"客户端{clientId}订阅主题:{Topic}");}/// <summary>/// 取消主题订阅/// </summary>/// <param name="clientId">客户端id</param>/// <param name="Topic">主题</param>/// <param name="args">事件原始参数</param>/// <returns></returns>async Task IMqttServer.ClientUnsubscribedTopicAsync(string clientId, string Topic,ClientUnsubscribedTopicEventArgs args){Console.WriteLine($"客户端{clientId} 取消主题订阅:{Topic}");}/// <summary>/// 收到客户端消息/// </summary>/// <param name="clientId">客户端id</param>/// <param name="Topic">主题</param>/// <param name="msg">消息内容</param>/// <param name="args">事件原始参数</param>/// <param name="ToTopic">推送消息到指定主题 ("主题","内容")</param>/// <returns></returns>async Task IMqttServer.InterceptingPublishAsync(string clientId, string Topic, string msg, InterceptingPublishEventArgs args, Action<string, string> ToTopic){Console.WriteLine($"客户端{clientId} 主题{Topic} 发送消息 内容:");Console.WriteLine(msg);//推送消息到指定主题ToTopic("主题","内容");}/// <summary>/// 服务启动事件/// </summary>/// <param name="args">事件原始参数</param>/// <returns></returns>async Task IMqttServer.StartedAsync(EventArgs args){}/// <summary>/// 服务停止事件/// </summary>/// <param name="args">事件原始参数</param>/// <returns></returns>async Task IMqttServer.StoppedAsync(EventArgs args){}/// <summary>/// 验证账号密码/// </summary>/// <param name="username">账号</param>/// <param name="password">密码</param>/// <param name="clientId">客户端id</param>/// <param name="args">事件原始参数</param>/// <returns></returns>bool IMqttServer.ValidatingConnectionAsync(string username, string password, string clientId, ValidatingConnectionEventArgs args){Console.WriteLine($"验证客户端{clientId}信息:{args.UserName} {args.Password}");return true;//验证通过//return false;//验证不通过}}
Program.cs
// 注入
builder.Services.AddSingleton<IMqttServer, MqttApp>();
builder.Services.AddSingleton<MqttServerBase>();/*
如果没有下面这段代码,那么程序启动后不会立即启动mqtt服务,需要在控制器注入来初始化实列,
app.Services.GetService 相当于访问了一次对象
*/
//立即启动Mqtt服务
//app.Services.GetService<MqttServerBase>();//延时启动Mqtt服务
Task.Run(async () => {await Task.Delay(3000);app.Services.GetService<MqttServerBase>();
});