C#MQTT协议服务器与客户端通讯实现(客户端包含断开重连模块)

C#MQTT协议服务器与客户端通讯实现

  • 1 DLL版本
  • 2 服务器
  • 3 客户端

1 DLL版本

MQTTnet.DLL版本-2.7.5.0
基于比较老的项目中应用的DLL,其他更高版本变化可能较大,谨慎参考。

2 服务器

开启服务器
关闭服务器
绑定事件【客户端连接服务器事件】
绑定事件【客户端断开(服务器)连接事件】
绑定事件【客户端订阅主题事件】
绑定事件【客户端退订主题事件】
绑定事件【接收客户端(发送)消息事件】

using System;
using System.Net;
using MQTTnet;
using MQTTnet.Server;namespace Demo_MQTT.Model
{public class ServerModel{private static MqttServer _mqttServer = null;private readonly Action<string> _callbackLog;public ServerModel(Action<string> callbackLog){_callbackLog = callbackLog;}/// <summary>/// 绑定客户端连接服务器事件/// </summary>private void MqttServer_ClientConnected(object sender, MqttClientConnectedEventArgs e){WriteLog($"客户端[{e.Client.ClientId}]已连接 {DateTime.Now:yyyy-MM-dd HH:mm:ss}{Environment.NewLine}");}/// <summary>/// 绑定客户端断开连接事件/// </summary>private void MqttServer_ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e){WriteLog($"客户端[{e.Client.ClientId}]已断开连接 {DateTime.Now:yyyy-MM-dd HH:mm:ss}{Environment.NewLine}");}/// <summary>/// 绑定客户端订阅主题事件/// </summary>private void Server_ClientSubscribedTopic(object sensor, MqttClientSubscribedTopicEventArgs e){WriteLog($">>> 客户端{e.ClientId}订阅主题{e.TopicFilter.Topic}");}/// <summary>/// 绑定客户端退订主题事件/// </summary>/// <param name="e"></param>private void Server_ClientUnsubscribedTopic(object sensor, MqttClientUnsubscribedTopicEventArgs e){WriteLog($">>> 客户端{e.ClientId}退订主题{e.TopicFilter}");}/// <summary>/// 绑定接收客户端消息事件/// </summary>private void MqttServer_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e){WriteLog($"接收到{e.ClientId}发送来的消息! {DateTime.Now:yyyy-MM-dd HH:mm:ss} {Environment.NewLine}");}private void WriteLog(string log){_callbackLog?.Invoke(log);}/// <summary>/// 开启服务器/// </summary>/// <param name="ip">IP地址</param>/// <param name="port">端口号</param>public void StartServer(string ip, int port){if (_mqttServer == null){var optionsBuilder = new MqttServerOptionsBuilder().WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip)).WithConnectionBacklog(1000).WithDefaultEndpointPort(port);IMqttServerOptions options = optionsBuilder.Build();try{_mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;_mqttServer.ClientConnected += MqttServer_ClientConnected;_mqttServer.ClientDisconnected += MqttServer_ClientDisconnected;_mqttServer.ApplicationMessageReceived += MqttServer_ApplicationMessageReceived;_mqttServer.ClientSubscribedTopic += Server_ClientSubscribedTopic;_mqttServer.ClientUnsubscribedTopic += Server_ClientUnsubscribedTopic;_mqttServer.StartAsync(options);}catch (Exception ex){Console.WriteLine(ex.Message);return;}WriteLog($"MQTT服务器启动成功 {DateTime.Now:yyyy-MM-dd HH:mm:ss}{Environment.NewLine}");}}/// <summary>/// 关闭服务器/// </summary>public void CloseServer(){_mqttServer?.StopAsync();}}
}

3 客户端

连接服务器
属性:客户端连接状态
客户端断开重连线程
获取所有订阅主题
订阅主题
退订主题
发送消息
绑定事件【客户端连接服务器事件】
绑定事件【客户端断开(服务器)连接事件】
绑定事件【客户端接收消息事件】

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;namespace Demo_MQTT.Model
{public class ClientModel{/// <summary>/// 记录所有订阅主题,用于断开重连时重新订阅主题/// </summary>private readonly List<string> _subscribeTopics = new List<string>();private MqttClient _mqttClient = null;private string _serverIp;private int _nServerPort;private bool _isRunningReConnectThreadStart = false;private string _clienID;/// <summary>/// 接受消息回调函数,参数:主题,消息内容/// </summary>private readonly Action<string, byte[]> _callbackReceived;private readonly Action<string> _callbackLog;/// <summary>/// 构造函数/// </summary>/// <param name="callbackReceived">接受消息回调函数,参数:主题,消息内容</param>/// <param name="callbackLog"></param>public ClientModel(Action<string, byte[]> callbackReceived, Action<string> callbackLog){_callbackReceived = callbackReceived;_callbackLog = callbackLog;}/// <summary>/// 连接服务器/// </summary>private async void ConnectServer(){try{if (_mqttClient == null){_mqttClient = new MqttFactory().CreateMqttClient() as MqttClient;_mqttClient.Connected += (s, a) => WriteLog($"【{_clienID}】已连接到MQTT服务器!");_mqttClient.Disconnected += (s, a) => WriteLog($"【{_clienID}】已断开MQTT连接!");_mqttClient.ApplicationMessageReceived += (sender, args) =>{_callbackReceived?.Invoke(args.ApplicationMessage.Topic, args.ApplicationMessage.Payload);};}if (_mqttClient.IsConnected) return;IMqttClientOptions options = new MqttClientOptions{ChannelOptions = new MqttClientTcpOptions(){Server = _serverIp,Port = _nServerPort},CleanSession = true};_clienID = options.ClientId;await _mqttClient.ConnectAsync(options);if (_mqttClient.IsConnected){ReConnectThreadStart();SubscribeAsync();}}catch (Exception ex){WriteLog("连接到MQTT服务器失败!");}}/// <summary>/// 客户端重连服务器线程-启动/// </summary>/// <returns></returns>private void ReConnectThreadStart(){if (_isRunningReConnectThreadStart) return;if (_mqttClient != null){new Task(() =>{_isRunningReConnectThreadStart = true;Thread.Sleep(5000);while (true){Thread.Sleep(1000);if (!IsConnect){WriteLog($"客户端[{_clienID}]断开连接,尝试重新连接服务器中...");int i;for (i = 0; i < 60; i++){if (IsConnect) break;WriteLog($"尝试第{i + 1}次连接服务器");ConnectServer();Thread.Sleep(1000);if (IsConnect) break;}_isRunningReConnectThreadStart = i < 60;}if (!_isRunningReConnectThreadStart) break;}}).Start();}}private void WriteLog(string log){_callbackLog?.Invoke(log);}/// <summary>/// 客户端连接状态/// </summary>public bool IsConnect => _mqttClient?.IsConnected == true;/// <summary>/// 连接服务器/// </summary>/// <param name="serverIp">服务器IP</param>/// <param name="serverPort">服务器端口</param>/// <param name="topic"></param>public async void ConnectServer(string serverIp, int serverPort){_serverIp = serverIp;_nServerPort = serverPort;await Task.Run(() => { ConnectServer(); });}/// <summary>/// 关闭客户端,断开客户端和服务器的连接/// </summary>public void CloseClient(){_mqttClient.DisconnectAsync();}/// <summary>/// 发送消息/// </summary>/// <param name="topic">发送主题</param>/// <param name="cmd">发送内容</param>[Obsolete("Obsolete")]public void PublishAsync(string topic, string cmd){var bytes = Encoding.UTF8.GetBytes(cmd);var mode = MqttQualityOfServiceLevel.AtMostOnce;var appMsg = new MqttApplicationMessage(topic, bytes, mode, false);_mqttClient.PublishAsync(appMsg);//发送消息}/// <summary>/// 订阅主题/// </summary>/// <param name="topics">主题标识</param>public void SubscribeAsync(params string[] topics){foreach (var topic in topics){if (!_subscribeTopics.Contains(topic)){_subscribeTopics.Add(topic);}}var topicFilters = _subscribeTopics.Select(topic => new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)).ToList();_mqttClient?.SubscribeAsync(topicFilters);}/// <summary>/// 退订已订阅主题/// </summary>/// <param name="topics">主题标识</param>public void UnSubscribeAsync(params string[] topics){if (topics == null || topics.Length == 0) return;var topicFilters = topics.Select(topic => new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)).ToList();_mqttClient.SubscribeAsync(topicFilters);}/// <summary>/// 获取所有订阅主题/// </summary>public string[] GetAllTopic => _subscribeTopics.ToArray();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/901123.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【连载3】基础智能体的进展与挑战综述

基础智能体的进展与挑战综述 从类脑智能到具备可进化性、协作性和安全性的系统 【翻译团队】刘军(liujunbupt.edu.cn) 钱雨欣玥 冯梓哲 李正博 李冠谕 朱宇晗 张霄天 孙大壮 黄若溪 2. 认知 人类认知是一种复杂的信息处理系统&#xff0c;它通过多个专门的神经回路协调运行…

Python语言介绍

Python 是一种高级、通用、解释型的编程语言&#xff0c;由 Guido van Rossum 于 1991 年首次发布。其设计哲学强调代码的可读性和简洁性。 Python通过简洁的语法和强大的生态系统&#xff0c;成为当今最受欢迎的编程语言之一。 一、核心特点 Python 是一种解释型、面向对象、…

什么是回表?哪些数据库存在回表?

目录 一、什么是回表1. 回表的核心流程2. 示例说明3. 回表的性能问题4. 总结 二、哪些数据库会有回表1. MySQL&#xff08;InnoDB&#xff09;2. Oracle3. 其他数据库&#xff08;如 SQL Server、PostgreSQL&#xff09;4. 总结 三、非聚集索引与聚集索引的区别及产生原因1. 聚…

ssh 免密登录服务器(vscode +ssh 免密登录)

每次打开vscode连接服务器都需要输入密码&#xff0c;特别繁琐。 然后自己在网上翻阅了一下教程&#xff0c;发现说的内容比较啰嗦&#xff0c;而且个人感觉非常有误导性倾向。 因此自己直接干脆写一个简便易懂的教程算了。 &#xff08;以经过本人亲测&#xff0c;真实可靠&am…

基于低空经济的无人机操控与维护实训室解决方案

一、低空经济时代下的无人机人才需求 1.1 低空经济发展趋势与政策机遇 在当前经济与科技飞速发展的大背景下&#xff0c;低空经济作为国家战略性新兴产业&#xff0c;正以迅猛之势崛起&#xff0c;展现出无限的潜力与活力。其应用场景极为广泛&#xff0c;涵盖了物流、安防、…

PyTorch实现二维卷积与边缘检测:从原理到实战

本文通过PyTorch实现二维互相关运算、自定义卷积层&#xff0c;并演示如何通过卷积核检测图像边缘。同时&#xff0c;我们将训练一个卷积核参数&#xff0c;使其能够从数据中学习边缘特征。 1. 二维互相关运算的实现 互相关运算&#xff08;Cross-Correlation&#xff09;是卷…

数字政府网络架构建设方案

数字政府网络架构建设方案 一、引言 随着信息技术的快速发展&#xff0c;数字政府建设已成为提升政府治理能力和服务水平的关键。网络架构作为数字政府的核心基础设施&#xff0c;对于保障数据安全、提高服务效率、促进信息共享具有重要意义。本方案旨在为数字政府网络架构建…

Python map函数介绍

在 Python 里&#xff0c;map() 是一个内置函数&#xff0c;其用途是将指定的函数应用于可迭代对象&#xff08;像列表、元组等&#xff09;的每个元素&#xff0c;最终返回一个新的迭代器。此迭代器所包含的元素是原可迭代对象中每个元素经过指定函数处理后的结果。map() 函数…

【服务器端表单字符验证】

文章目录 一、实验目的二、核心代码实现三、调试关键问题四、总结 一、实验目的 掌握JSP表单验证在服务器端的实现技术&#xff0c;实现对用户输入字符的非空及长度为5的验证&#xff0c;返回对应提示信息并优化用户交互。 二、核心代码实现 前端表单 <form action"…

dify windos,linux下载安装部署,提供百度云盘地址

dify下载安装 dify1.0.1 windos安装包百度云盘地址 通过网盘分享的文件&#xff1a;dify-1.0.1.zip 链接: 百度网盘 请输入提取码 提取码: 1234 dify安装包 linux安装包百度云盘地址 通过网盘分享的文件&#xff1a;dify-1.0.1.tar.gz 链接: 百度网盘 请输入提取码 提取码…

C++ Primer 5e 习题2.5: 指出如下字面量常量的类型

Exercise 2.5: Determine the type of each of the following literals. Explain the differences among the literals in each of the four examples: (a) ‘a’, L’a’, “a”, L"a" (b) 10, 10u, 10L, 10uL, 012, 0xC © 3.14, 3.14f, 3.14L (d) 10, 10u, 10…

CFS 调度器两种调度类型普通调度 和 组调度

在 Linux 的 CFS&#xff08;Completely Fair Scheduler&#xff09; 调度器中&#xff0c;确实存在两种调度类型&#xff1a;普通调度 和 组调度。这两种调度类型分别适用于不同的场景&#xff0c;并通过三个关键维度&#xff08;权重、抢占优先级、最大配额&#xff09;来影响…

AF3 ProteinDataset类的_get_masked_sequence方法解读

AlphaFold3 protein_dataset模块 ProteinDataset 类 _get_masked_sequence 方法属于作用是为需要预测的残基生成掩码。该掩码以二进制张量形式呈现,其中 1 代表需要预测的部分,0 代表其他部分。此方法会依据多个参数来选定要掩码的残基,这些参数包含 mask_whole_chains、mas…

【音视频】SDL渲染YUV格式像素

SDL视频显示的流程 实现流程 准备视频文件 准备一个格式为yuv420p&#xff0c;分辨率为320x240的yuv数据&#xff0c;并且将视频文件放入项目构建的目录下&#xff1a; 初始化SDL 初始化SDL的视频模块 //初始化 SDL if(SDL_Init(SDL_INIT_VIDEO)) {fprintf( stderr, "…

关于群晖安装tailscale后无法直链的问题

问题是我局域网的ipv6无法正确获取到ip, 通过命令可以看到ipv6没有ip tailscale netcheck C:\Users\Administrator>tailscale netcheck 2025/04/12 23:43:34 attempting to fetch a DERPMap from https://controlplane.tailscale.comReport:* Time: 2025-04-12T15:43:38.27…

[数据结构]Trie字典树

GPT的介绍 &#x1f9e0; 一句话总结&#xff1a; 字典树是一种专门用来存很多字符串的“超级前缀树”&#xff0c;查找某个字符串或前缀的时候&#xff0c;特别快&#xff01; ✍️ 举个生活例子&#xff08;类比&#xff09;&#xff1a; 你想做一个词典&#xff08;Dictio…

04-算法打卡-数组-二分查找-leetcode(69)-第四天

1 题目地址 69. x 的平方根 - 力扣&#xff08;LeetCode&#xff09;69. x 的平方根 - 给你一个非负整数 x &#xff0c;计算并返回 x 的 算术平方根 。由于返回类型是整数&#xff0c;结果只保留 整数部分 &#xff0c;小数部分将被 舍去 。注意&#xff1a;不允许使用任何内…

AI领域再突破,永洪科技荣获“2025人工智能+创新案例”奖

在2025年的今天&#xff0c;人工智能已从技术概念全面渗透至产业核心。中国作为全球AI技术应用的前沿阵地&#xff0c;正通过“人工智能”行动加速推进技术与实体经济深度融合。 这一背景下&#xff0c;永洪科技凭借其“国内某头部ICT人力资源板块GenAI项目”荣获“2025全国企业…

反序列化漏洞介绍与挖掘指南

目录 反序列化漏洞介绍与挖掘指南 一、漏洞核心原理与危害 二、漏洞成因与常见场景 1. 漏洞根源 2. 高危场景 三、漏洞挖掘方法论 1. 静态分析 2. 动态测试 3. 利用链构造 四、防御与修复策略 1. 代码层防护 2. 架构优化 3. 运维实践 五、工具与资源推荐 总结 反…

从零开始的C++编程 2(类和对象下)

目录 1.构造函数初始化列表 2.类型转换 3.static成员 4.友元 5.内部类 6.匿名对象 1.构造函数初始化列表 ①之前我们实现构造函数时&#xff0c;初始化成员变量主要使⽤函数体内赋值&#xff0c;构造函数初始化还有⼀种⽅式&#xff0c;就是初始化列表&#xff0c;初始化…