.Net Core/.net 6/.Net 8 实现Mqtt服务器

.Net Core/.net 6/.Net 8 实现Mqtt服务端

  • Mqtt服务端代码
  • `IMqttServer` 接口
  • 业务类,实现 `IMqttServer` 接口
  • `Program.cs`

直接上代码
nuget 引用
MQTTnet

Mqtt服务端代码

using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;namespace Code.Mqtt
{/// <summary>/// mqtt服务端/// </summary>public class MqttServerBase{public MqttServer _server;readonly IMqttServer _mqttServer;/// <summary>/// 向指定主题发送消息/// </summary>public Action<string, string> ToTopic;/// <summary>/// 主题/客户端列表/// </summary>public Dictionary<string,List<string>> Topic_Client=new Dictionary<string, List<string>>();public MqttServerBase(IMqttServer mqttServer){_mqttServer = mqttServer;if(mqttServer == null){throw new Exception("MqttServer配置错误");}var optionbuilder = new MqttServerOptionsBuilder().WithDefaultEndpoint()//设置默认地址127.0.0.1.WithDefaultEndpointPort(_mqttServer.Port);//1883_server = new MqttFactory().CreateMqttServer(optionbuilder.Build());ToTopic = (topic, msg) => {_server.InjectApplicationMessage(new InjectedMqttApplicationMessage(new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(msg).Build()));};_server.ClientConnectedAsync += (e) =>{_mqttServer.ClientConnectedAsync(e.ClientId, e);return Task.CompletedTask;};_server.ClientDisconnectedAsync += (e) => {_mqttServer.ClientDisconnectedAsync(e.ClientId, e);return Task.CompletedTask;};_server.InterceptingPublishAsync += (e)=> {var msg = e.ApplicationMessage?.PayloadSegment.Array?.BToString();var Topic = e.ApplicationMessage.Topic;//判断主题是否存在if (Topic_Client.ContainsKey(Topic)){_mqttServer.InterceptingPublishAsync(e.ClientId, Topic, msg, e, ToTopic);}return Task.CompletedTask;};_server.ApplicationMessageNotConsumedAsync += (e) => {var Topic = e.ApplicationMessage.Topic;var msg = e.ApplicationMessage.PayloadSegment.Array.BToString();//判断主题是否存在,否则会进入死循环if (Topic_Client.ContainsKey(Topic)){_mqttServer.ApplicationMessageNotConsumedAsync(Topic, msg, e);}return Task.CompletedTask;};_server.ValidatingConnectionAsync += (e) => {if (_mqttServer.ValidatingConnectionAsync(e.UserName, e.Password,e.ClientId, e)){e.ReasonCode = MqttConnectReasonCode.Success;//验证通过}else{e.ReasonCode = MqttConnectReasonCode.Banned;//验证不通过}return Task.CompletedTask;};//订阅主题_server.ClientSubscribedTopicAsync += (e) =>{var _topic = e.TopicFilter.Topic;//保存主题if (!Topic_Client.ContainsKey(_topic)){Topic_Client.Add(_topic, new List<string>());}//添加订阅主题的客户端if (!Topic_Client[_topic].Any(x=>x== e.ClientId)){Topic_Client[_topic].Add(e.ClientId);}_mqttServer.ClientSubscribedTopicAsync(e.ClientId, _topic, e);return Task.CompletedTask;};//取消订阅_server.ClientUnsubscribedTopicAsync += (e) =>{var _topic = e.TopicFilter;//移除客户端if (!Topic_Client.ContainsKey(_topic)){Topic_Client[_topic].Remove(e.ClientId);if (Topic_Client[_topic].Count == 0){// 移除没有客户端订阅的主题Topic_Client.Remove(_topic);}_mqttServer.ClientUnsubscribedTopicAsync(e.ClientId, e.TopicFilter, e);}return Task.CompletedTask;};//服务启动事件_server.StartedAsync += _mqttServer.StartedAsync;//服务停止事件_server.StoppedAsync += _mqttServer.StoppedAsync;Start();}public async Task Start(){Console.WriteLine("正在启动Mqtt服务");await _server.StartAsync();Console.WriteLine("Mqtt服务启动成功,端口:" + _mqttServer.Port);}public async Task Stop(){Console.WriteLine("正在停止Mqtt服务");await _server.StopAsync();Console.WriteLine("Mqtt服务停止");}/// <summary>/// 重启服务/// </summary>/// <returns></returns>public async Task ReStart(){await Stop();await Start();}}
}

IMqttServer 接口


using MQTTnet.Server;namespace Code.Mqtt
{public interface IMqttServer{/// <summary>/// 服务端口/// </summary>int Port { get;}/// <summary>/// 服务启动事件/// </summary>/// <param name="args"></param>/// <returns></returns>public Task StartedAsync(EventArgs args);/// <summary>/// 服务停止事件/// </summary>/// <param name="args"></param>/// <returns></returns>public Task StoppedAsync(EventArgs args);/// <summary>/// 客户端上线/// </summary>/// <param name="args"></param>/// <returns></returns>public Task ClientConnectedAsync(string clientId,ClientConnectedEventArgs args);/// <summary>/// 客户端下线/// </summary>/// <param name="args"></param>/// <returns></returns>public Task ClientDisconnectedAsync(string clientId,ClientDisconnectedEventArgs args);/// <summary>/// 消息事件/// </summary>/// <param name="args"></param>/// <param name="ToTopic">发送消息</param>/// <returns></returns>public Task InterceptingPublishAsync(string clientId,string Topic,string msg,InterceptingPublishEventArgs args, Action<string, string> ToTopic);/// <summary>/// 验证/// </summary>/// <param name="username">账号</param>/// <param name="password">密码</param>/// <param name="args"></param>/// <returns></returns>public bool ValidatingConnectionAsync(string username,string password,string clientId,ValidatingConnectionEventArgs args);/// <summary>/// 消息未消费事件/// </summary>/// <param name="args"></param>/// <returns></returns>public Task ApplicationMessageNotConsumedAsync(string Topic,string msg,ApplicationMessageNotConsumedEventArgs args);/// <summary>/// 订阅主题事件/// </summary>/// <param name="args"></param>/// <returns></returns>public Task ClientSubscribedTopicAsync(string clientId,string Topic,ClientSubscribedTopicEventArgs args);/// <summary>/// 取消订阅主题事件/// </summary>/// <param name="args"></param>/// <returns></returns>public Task ClientUnsubscribedTopicAsync(string clientId,string Topic,ClientUnsubscribedTopicEventArgs args);}
}

业务类,实现 IMqttServer 接口

public class MqttApp : IMqttServer{/// <summary>/// 服务端口/// </summary>int IMqttServer.Port { get => 10883; }public MqttApp(){}/// <summary>/// 消息未消费/// </summary>/// <param name="Topic">主题</param>/// <param name="msg">消息内容</param>/// <param name="args">事件原始参数</param>/// <returns></returns>async Task IMqttServer.ApplicationMessageNotConsumedAsync(string Topic,string msg,ApplicationMessageNotConsumedEventArgs args){Console.WriteLine($"消息未消费{Topic}:");Console.WriteLine(msg);}/// <summary>/// 客户端上线/// </summary>/// <param name="clientId">客户端id</param>/// <param name="args">事件原始参数</param>/// <returns></returns>async Task IMqttServer.ClientConnectedAsync(string clientId, ClientConnectedEventArgs args){Console.WriteLine($"客户端上线 id:{clientId}");}/// <summary>/// 客户端下线/// </summary>/// <param name="clientId">客户端id</param>/// <param name="args">事件原始参数</param>/// <returns></returns>async Task IMqttServer.ClientDisconnectedAsync(string clientId, ClientDisconnectedEventArgs args){Console.WriteLine($"客户端下线 id:{clientId}");}/// <summary>/// 订阅主题/// </summary>/// <param name="clientId">客户端id</param>/// <param name="Topic">主题</param>/// <param name="args">事件原始参数</param>/// <returns></returns>async Task IMqttServer.ClientSubscribedTopicAsync(string clientId, string Topic, ClientSubscribedTopicEventArgs args){Console.WriteLine($"客户端{clientId}订阅主题:{Topic}");}/// <summary>/// 取消主题订阅/// </summary>/// <param name="clientId">客户端id</param>/// <param name="Topic">主题</param>/// <param name="args">事件原始参数</param>/// <returns></returns>async Task IMqttServer.ClientUnsubscribedTopicAsync(string clientId, string Topic,ClientUnsubscribedTopicEventArgs args){Console.WriteLine($"客户端{clientId} 取消主题订阅:{Topic}");}/// <summary>/// 收到客户端消息/// </summary>/// <param name="clientId">客户端id</param>/// <param name="Topic">主题</param>/// <param name="msg">消息内容</param>/// <param name="args">事件原始参数</param>/// <param name="ToTopic">推送消息到指定主题 ("主题","内容")</param>/// <returns></returns>async Task IMqttServer.InterceptingPublishAsync(string clientId, string Topic, string msg, InterceptingPublishEventArgs args, Action<string, string> ToTopic){Console.WriteLine($"客户端{clientId} 主题{Topic} 发送消息 内容:");Console.WriteLine(msg);//推送消息到指定主题ToTopic("主题","内容");}/// <summary>/// 服务启动事件/// </summary>/// <param name="args">事件原始参数</param>/// <returns></returns>async Task IMqttServer.StartedAsync(EventArgs args){}/// <summary>/// 服务停止事件/// </summary>/// <param name="args">事件原始参数</param>/// <returns></returns>async Task IMqttServer.StoppedAsync(EventArgs args){}/// <summary>/// 验证账号密码/// </summary>/// <param name="username">账号</param>/// <param name="password">密码</param>/// <param name="clientId">客户端id</param>/// <param name="args">事件原始参数</param>/// <returns></returns>bool IMqttServer.ValidatingConnectionAsync(string username, string password, string clientId, ValidatingConnectionEventArgs args){Console.WriteLine($"验证客户端{clientId}信息:{args.UserName} {args.Password}");return true;//验证通过//return false;//验证不通过}}

Program.cs

// 注入
builder.Services.AddSingleton<IMqttServer, MqttApp>();
builder.Services.AddSingleton<MqttServerBase>();/* 
如果没有下面这段代码,那么程序启动后不会立即启动mqtt服务,需要在控制器注入来初始化实列,
app.Services.GetService 相当于访问了一次对象
*/
//立即启动Mqtt服务
//app.Services.GetService<MqttServerBase>();//延时启动Mqtt服务
Task.Run(async () => {await Task.Delay(3000);app.Services.GetService<MqttServerBase>();
});

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/730798.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STM32day3

1.思维导图 1.总结任务的调度算法&#xff0c;把实现代码再写一下 /* Definitions for myTask02 */ osThreadId_t myTask02Handle; uint32_t myTask02Buffer[ 64 ]; osStaticThreadDef_t myTask02ControlBlock; const osThreadAttr_t myTask02_attributes {.name "myTa…

代码随想录算法训练营第三十九天|62.不同路径、63. 不同路径 II

62.不同路径 刷题https://leetcode.cn/problems/unique-paths/description/文章讲解https://programmercarl.com/0062.%E4%B8%8D%E5%90%8C%E8%B7%AF%E5%BE%84.html视频讲解https://www.bilibili.com/video/BV1ve4y1x7Eu/?vd_sourceaf4853e80f89e28094a5fe1e220d9062 题解&…

react的diff源码

react 的 render 阶段&#xff0c;其中 begin 时会调用 reconcileChildren 函数&#xff0c; reconcileChildren 中做的事情就是 react 知名的 diff 过程 diff 算法介绍 react 的每次更新&#xff0c;都会将新的 ReactElement 内容与旧的 fiber 树作对比&#xff0c;比较出它们…

零基础学习JS--基础篇--索引集合类

数组是由名称和索引引用的值构成的有序列表。 JavaScript 中没有明确的数组数据类型。但是&#xff0c;你可以使用预定义的 Array 对象及其方法来处理应用程序中的数组。Array 对象具有以各种方式操作数组的方法&#xff0c;例如连接、反转和排序。它有一个用于确定数组长度的…

md5绕过

文章目录 \\和\\\md5数组绕过科学计数法绕过双md加密md5碰撞Hash长度攻击 下面会以同一道题给大家演示&#xff1a; (题目来源与nssctf) 和 在php代码中我们会看到和&#xff0c;虽然两个都是表示相等&#xff0c;但是在细节上会有所部区别 &#xff1a;是弱比较&#xff0c;只…

0201安装报错-hbase-大数据学习

1 基础环境简介 linux系统&#xff1a;centos&#xff0c;前置安装&#xff1a;jdk、hadoop、zookeeper&#xff0c;版本如下 软件版本描述centos7linux系统发行版jdk1.8java开发工具集hadoop2.10.0大数据生态基础组件zookeeper3.5.7分布式应用程序协调服务hbase2.4.11分布式…

Sora 作者被曝读博期间仅发表两篇论文,我们是否需要重塑科研价值观?

众所周知&#xff0c;在当今学术界&#xff0c;论文数量和产出速度常常被视为研究者生产力和学术成就的重要标尺。笔者也面试过很多博士生候选人&#xff0c;大家普遍会以自己读博期间发表过10几篇甚至几十篇论文而骄傲&#xff0c;很少有候选人会强调自己读博期间虽然发表论文…

UE5.1_TimeLine

UE5.1_TimeLine 问题引入&#xff1a;UE的Timeline可以在一个场景下无限制的使用多少次&#xff1f;一个动画流程的Timeline的时间持续怎么算?TimeLine中嵌套Timeline的做法是否是合理的&#xff1f;

【数据结构】泛型

文章目录 一、什么是泛型二、引出泛型1、语法 三、泛型类的使用1、语法2、示例3、类型推导(Type Inference) 四、裸类型(Raw Type)五、泛型如何编译的六、泛型的上界1、语法2、示例 七、泛型方法八、通配符九、包装类 一、什么是泛型 一般的类和方法&#xff0c;只能使用具体的…

Matlab|基于目标级联法的微网群多主体分布式优化调度

目录 主要内容 1.1 上层微网群模型 1.2 下层微网模型 部分程序 实现效果 下载链接 主要内容 本文复现《基于目标级联法的微网群多主体分布式优化调度》文献的目标级联部分&#xff0c; 建立微网群系统的两级递阶优化调度模型: 上层是微网群能量调度中心优化调度…

java中的字符串比较(题目作示例)

错误的代码 import java.util.Scanner; public class one {public static void main(String[] args) {Scanner scnew Scanner(System.in);String b"47568";int i0;for ( i 0; i <3; i){String asc.next();if(ab){System.out.println("密码正确&#xff0c;登…

C++ 类的前向声明的用法

我们知道C的类应当是先定义&#xff0c;然后使用。但在处理相对复杂的问题、考虑类的组合时&#xff0c;很可能遇到俩个类相互引用的情况&#xff0c;这种情况称为循环依赖。 例如&#xff1a; class A { public:void f(B b);//以B类对象b为形参的成员函数//这里编译错位&…

在外包公司干了4年,技术退步2年...

先说情况&#xff0c;大专毕业&#xff0c;18年通过校招进入湖南某软件公司&#xff0c;干了接近6年的功能测试&#xff0c;今年年初&#xff0c;感觉自己不能够在这样下去了&#xff0c;长时间呆在一个舒适的环境会让一个人堕落!而我已经在一个企业干了四年的功能测试&#xf…

【Linux】常见的基本指令(中)

在本篇博客中&#xff0c;将会继续介绍Linux的常见的基本指令 一.rmdir指令 rmdir [空文件夹名] 删除空文件夹&#xff08;空目录&#xff09; 二.rm指令 rm [文件名] 删除文件 rm -r 递归删除文件夹&#xff08;目录&#xff09; rm -f 强制…

白话模电:1.绪论与半导体(考研面试常问问题)

一、什么是信号&#xff1f;什么是电信号&#xff1f; 信号反映消息的物理量&#xff0c;电信号是反应电压或电流变化的物理量。 二、什么是模拟信号&#xff1f;什么是数字信号&#xff1f; 模拟信号是时间和数值上均连续的信号&#xff0c;数字信号是时间和数值上均离散的信号…

[HackMyVM]靶场 Zeug

kali:192.168.56.104 主机发现 arp-scan -l # arp-scan -l Interface: eth0, type: EN10MB, MAC: 00:0c:29:d2:e0:49, IPv4: 192.168.56.104 Starting arp-scan 1.10.0 with 256 hosts (https://github.com/royhills/arp-scan) 192.168.56.1 0a:00:27:00:00:05 (Un…

2024年起该地推行「软考电子证书」!关于软考证书常见问题解答!

近日&#xff0c;安徽省人力资源和社会保障厅发布“关于2024年度安徽省专业技术人员职业资格考试工作计划及有关事项”的通知&#xff1a;从2024年起&#xff0c;推行电子证书&#xff0c;原则上不再发放纸质证书&#xff0c;电子证书与纸质证书具有同等效力。 安徽也推行电子…

Seata 2.x 系列【5】直接部署

有道无术&#xff0c;术尚可求&#xff0c;有术无道&#xff0c;止于术。 本系列Seata 版本 2.0.0 本系列Spring Boot 版本 3.2.0 本系列Spring Cloud 版本 2023.0.0 源码地址&#xff1a;https://gitee.com/pearl-organization/study-seata-demo 文章目录 1. 概述2. 环境要…

学习嵌入式C语言要掌握到什么程度?

学习嵌入式C语言要掌握到什么程度&#xff1f; 在开始前我分享下我的经历&#xff0c;我刚入行时遇到一个好公司和师父&#xff0c;给了我机会&#xff0c;一年时间从3k薪资涨到18k的&#xff0c; 我师父给了一些 电气工程师学习方法和资料&#xff0c;让我不断提升自己&#…

Java注解介绍

Java注解 注解介绍元注解RetentionTargetDocumentedInherited接口类测试结果 注解介绍 Java注解&#xff08;Annotation&#xff09;是一种元数据&#xff08;Metadata&#xff09;的形式&#xff0c;它可以被添加到Java代码中的类、方法、变量、参数等元素上&#xff0c;以提…