Java 使用 EMQX 实现物联网 MQTT 通信

一、介绍

1、MQTT

MQTT(Message Queuing Telemetry Transport, 消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为远程连接设备提过实时可靠的消息服务,作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(loT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

特点:
使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
对负载内容屏蔽的消息传输;
使用 TCP/IP 提供网络连接;
有三种消息发布服务质量:
小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;
使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。

2、EMQX

EMQX 是一个「无限连接,任意集成,随处运行」大规模分布式物联网接入平台。
EMQX 企业版提供一体化的分布式 MQTT 消息服务和强大的 IoT 规则引擎,为高可靠、高性能的物联网实时数据移动、处理和集成提供动力,助力企业快速构建关键业务的 IoT 平台与应用。附下载地址: https://www.emqx.com/zh/try?product=enterprise 可以自行下载对应版本运行
在这里插入图片描述

优势:
海量连接:单节点支持 500 万 MQTT 设备连接,集群可水平扩展至支持 1 亿并发的 MQTT 连接。
高可靠:弹性伸缩,无单点故障。内置 RocksDB 可靠地持久化 MQTT 消息,确保无数据损失。
数据安全:端到端数据加密(支持国密),细粒度访问控制,保障数据安全,满足企业合规需求。
多协议:支持 MQTT、HTTP、QUIC、WebSocket、LwM2M/CoAP 或专有协议连接任何设备。
高性能:单节点支持每秒实时接收、处理与分发数百万条的 MQTT 消息。毫秒级消息交付时延。
易运维:图形化配置、操作与管理,实时监测运行状态。支持 MQTT 跟踪进行端到端问题分析。

3、Mria 集群架构​

支持全新的 Mria 集群架构,在此架构下 EMQX 水平扩展性得到指数级提升,单个集群可以轻松支持 1 亿 MQTT 连接,这使得 EMQX 5.0 成为目前全球最具扩展性的 MQTT Broker。

在构建满足用户业务需求的更大规模集群的同时,Mria 架构还能够降低大规模部署下的脑裂风险以及脑裂后的影响,以提供更加稳定可靠的物联网数据接入服务。

具体可以查看官方文档: https://docs.emqx.com/zh/enterprise/v5.1/deploy/cluster/create-cluster.html

4、MQTTX

MQTTX 是由 EMQ 开发的一款开源跨平台 MQTT 5.0 桌面客户端,它兼容 macOS,Linux 以及 Windows 系统。MQTTX 的用户界面 UI 采用聊天式设计,使得操作逻辑更加简明直观。它支持用户快速创建和保存多个 MQTT 连接,便于测试 MQTT/MQTTS 连接,以及 MQTT 消息的订阅和发布。

在这里插入图片描述

主要功能
采用聊天界面设计,使得操作更加简单明了
跨平台兼容,支持在 Windows,macOS,Linux 系统上运行
100% 兼容 MQTT v5.0,v3.1.1 和 v3.1 协议
订阅的 MQTT 主题支持自定义颜色标签
支持单向和双向 SSL 认证,同时支持 CA 和自签名证书
支持通过 WebSocket 连接 MQTT 服务器
支持 Hex, Base64, JSON, Plaintext 等 Payload 格式转换
自定义脚本支持模拟 MQTT 发布/订阅测试
提供完整的日志记录功能
多语言支持:简体中文、英语、日语、土耳其语及匈牙利语 ??? ??? ??? ??? ???
自由切换 Light、Dark、Night 三种主题模式

二、实战

1、引入maven依赖:

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version>
</dependency>
# EMQX配置
emqx:# EMQX服务地址,端口号默认18083url: http://127.0.0.1:18083# 认证用户名username: admin# 密码password: admin123456
/*** EMQX 登录账号密码配置*/@Data
@Configuration
@ConfigurationProperties(prefix = "emqx")
public class EmqxConfig {private String url;private String username;private String password;
}
spring:# MQTT配置mqtt:# MQTT服务地址,端口号默认1883,如果有多个,用逗号隔开host-url: tcp://127.0.0.1:1883# 用户名username: admin# 密码password: admin123456# 客户端id(不能重复)client-id: real-mqtt-client# MQTT默认的消息推送主题,实际可在调用接口时指定default-topic: topic
@Configuration
@ConfigurationProperties(prefix = "spring.mqtt")
@Data
public class MqttConfig {private String username;private String password;private String hostUrl;private String clientId;private String defaultTopic;
}

MQTT客户端连接工厂类

@Slf4j
@Component
public class MqttFactory {public static ConcurrentHashMap<String, MqttClient> clientMap = new ConcurrentHashMap<>();@Autowiredprivate MqttConfig mqttConfig;@Autowiredprivate RealPersonAccessDeviceMapper realPersonAccessDeviceMapper;/*** 在bean初始化后连接到服务器*/@PostConstructpublic void init() {String mqttStartFlag = ParamResolver.getStr(RealCommonConstants.MQTT_START_FLAG);if (StrUtil.equals(mqttStartFlag, CommonConstants.SYS_YES_NO_Y)) {// 初始化订阅主题initSubscribeTopic(getInstance());}}/*** 初始化客户端*/public MqttClient getInstance() {MqttClient client = null;if (clientMap.get(mqttConfig.getClientId()) == null) {try {client = new MqttClient(mqttConfig.getHostUrl(), mqttConfig.getClientId());// MQTT配置对象MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();// 设置自动重连, 其它具体参数可以查看MqttConnectOptionsmqttConnectOptions.setAutomaticReconnect(true);// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接// mqttConnectOptions.setCleanSession(true);// 设置超时时间 单位为秒mqttConnectOptions.setConnectionTimeout(10);mqttConnectOptions.setUserName(mqttConfig.getUsername());mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());// mqttConnectOptions.setServerURIs(new String[]{url});// 设置会话心跳时间 单位为秒mqttConnectOptions.setKeepAliveInterval(10);// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。// mqttConnectOptions.setWill("willTopic", "offline".getBytes(), 2, false);if (!client.isConnected()) {client.connect(mqttConnectOptions);}client.setCallback(new MqttCallBack());log.info("MQTT创建client成功={}", JSONObject.toJSONString(client));clientMap.put(mqttConfig.getClientId(), client);} catch (MqttException e) {log.error("MQTT连接消息服务器[{}]失败", mqttConfig.getClientId() + "-" + mqttConfig.getHostUrl());}} else {client = clientMap.get(mqttConfig.getClientId());log.info("MQTT从map里获取到client,clientId=" + mqttConfig.getClientId());// TODO 已采用自动重连策略
//			log.info("MQTT从map里获取到client={}", JSONObject.toJSONString(client));
//			if (!client.isConnected()) {
//				initSubscribeTopic(client);// 如果缓存里的client已经断开,则清除该缓存,再重新创建客户端连接
//				clientMap.remove(mqttConfig.getClientId());
//				this.getInstance();
//			}}return client;}/*** 初始化订阅主题* <p>* 消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息*/public void initSubscribeTopic(MqttClient client) {// 查询所有宇泛mqtt设备并订阅主题List<RealPersonAccessDevice> deviceList = realPersonAccessDeviceMapper.selectList(Wrappers.<RealPersonAccessDevice>lambdaQuery().eq(RealPersonAccessDevice::getDeviceProducerType, RealCommonConstants.DeviceProducerType.UNIUBI.getValue()).and(wrapper -> wrapper.eq(RealPersonAccessDevice::getProtocolType, RealCommonConstants.ProtocolType.MQTT.getValue()).or().eq(RealPersonAccessDevice::getProtocolType, RealCommonConstants.ProtocolType.UMQTT.getValue())).eq(RealPersonAccessDevice::getStatus, CommonConstants.STATUS_ENABLE));if (CollectionUtil.isNotEmpty(deviceList)) {// 订阅设备发布消息主题List<String> upstreamTopics = new ArrayList<>();List<Integer> upstreamQos = new ArrayList<>();for (RealPersonAccessDevice device : deviceList) {if (StrUtil.equals(device.getProtocolType(), RealCommonConstants.ProtocolType.MQTT.getValue())) {upstreamTopics.add(String.format(MqttCommonConstants.DEFAULT, device.getDeviceNo()));upstreamQos.add(1);upstreamTopics.add(String.format(MqttCommonConstants.UPSTREAM, device.getDeviceNo()));upstreamQos.add(0);upstreamTopics.add(String.format(MqttCommonConstants.WILL, device.getDeviceNo()));upstreamQos.add(1);} else if (StrUtil.equals(device.getProtocolType(), RealCommonConstants.ProtocolType.UMQTT.getValue())) {upstreamTopics.add(String.format(UMqttCommonConstants.EVENT, device.getDeviceNo()));upstreamQos.add(1);upstreamTopics.add(String.format(UMqttCommonConstants.ONLINE));upstreamQos.add(1);upstreamTopics.add(String.format(UMqttCommonConstants.RESPONSE, device.getDeviceNo()));upstreamQos.add(2);upstreamTopics.add(String.format(UMqttCommonConstants.UPLOAD, device.getDeviceNo()));upstreamQos.add(0);}}try {client.subscribe(upstreamTopics.toArray(new String[upstreamTopics.size()]), upstreamQos.stream().mapToInt(Integer::intValue).toArray());} catch (MqttException e) {e.printStackTrace();}}}}

业务方法

@Slf4j
@Data
@Configuration
public class UMqttClientService {private final MqttFactory mqttFactory;private final StringRedisTemplate redisTemplate;/*** 订阅主题*/public void subscribeTopic(String deviceNo) {try {// 订阅设备发布消息主题List<String> upstreamTopics = new ArrayList<>();upstreamTopics.add(String.format(UMqttCommonConstants.EVENT, deviceNo));upstreamTopics.add(UMqttCommonConstants.ONLINE);upstreamTopics.add(String.format(UMqttCommonConstants.RESPONSE, deviceNo));upstreamTopics.add(String.format(UMqttCommonConstants.UPLOAD, deviceNo));int[] upstreamQos = {1, 1, 2, 0};mqttFactory.getInstance().subscribe(upstreamTopics.toArray(new String[0]), upstreamQos);} catch (MqttException e) {e.printStackTrace();}}/*** 取消订阅主题*/public void stopSubscribeTopic(String deviceNo) {try {// 取消订阅设备发布消息主题List<String> upstreamTopics = new ArrayList<>();upstreamTopics.add(String.format(UMqttCommonConstants.EVENT, deviceNo));upstreamTopics.add(UMqttCommonConstants.ONLINE);upstreamTopics.add(String.format(UMqttCommonConstants.RESPONSE, deviceNo));upstreamTopics.add(String.format(UMqttCommonConstants.UPLOAD, deviceNo));mqttFactory.getInstance().unsubscribe(upstreamTopics.toArray(new String[0]));} catch (MqttException e) {e.printStackTrace();}}/*** 断开连接*/public void disConnect() {try {mqttFactory.getInstance().disconnect();} catch (MqttException e) {e.printStackTrace();}}/*** 订阅主题*/public void subscribe(String topic, int qos) {try {mqttFactory.getInstance().subscribe(topic, qos);} catch (MqttException e) {e.printStackTrace();}}/*** 发布请求设备消息** @param deviceNo 设备编号* @param message  消息*/public void publish(String deviceNo, String message) {publish(1, false, String.format(UMqttCommonConstants.REQUEST, deviceNo), message);}/*** 发布请求设备消息*/public void publish(UMqttPublishDate publishDate) {// 将消息id和方法名存到redis中:缓存3分钟redisTemplate.opsForValue().set(UMqttCommonConstants.UMQTT_MSG_REDIS_KEY + publishDate.getId(),JSON.toJSONString(publishDate), 24, TimeUnit.HOURS);publish(1, false, String.format(UMqttCommonConstants.REQUEST, publishDate.getDeviceNo()), publishDate.getMessage());}/*** 发布响应设备消息*/public void publishResponse(UMqttPublishDate publishDate) {// 将消息id和方法名存到redis中:缓存3分钟redisTemplate.opsForValue().set(UMqttCommonConstants.UMQTT_MSG_REDIS_KEY + publishDate.getId(),JSON.toJSONString(publishDate), 24, TimeUnit.HOURS);publish(1, false, String.format(UMqttCommonConstants.RESPONSE, publishDate.getDeviceNo()), publishDate.getMessage());}/*** 发布消息** @param qos      qos* @param retained retained* @param topic    主题* @param message  消息*/public void publish(int qos, boolean retained, String topic, String message) {log.info("发布消息topic:" + topic);log.info("发布消息message:" + message);MqttMessage mqttMessage = new MqttMessage();mqttMessage.setQos(qos);mqttMessage.setRetained(retained);mqttMessage.setPayload(message.getBytes());//主题的目的地,用于发布/订阅信息MqttTopic mqttTopic = mqttFactory.getInstance().getTopic(topic);//提供一种机制来跟踪消息的传递进度//用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度MqttDeliveryToken token;try {//将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态//一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。token = mqttTopic.publish(mqttMessage);token.waitForCompletion();} catch (MqttException e) {e.printStackTrace();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/78696.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第37章_瑞萨MCU零基础入门系列教程之DAC数模转换模块

本教程基于韦东山百问网出的 DShanMCU-RA6M5开发板 进行编写&#xff0c;需要的同学可以在这里获取&#xff1a; https://item.taobao.com/item.htm?id728461040949 配套资料获取&#xff1a;https://renesas-docs.100ask.net 瑞萨MCU零基础入门系列教程汇总&#xff1a; ht…

MATLAB基础-MAT文件的读写操作

简介 MAT文件是MATLAB格式的双精度二进制数据文件&#xff0c;由MATLAB软件创建&#xff0c;可以使用MATLAB软件再其他计算机上以其他浮点格式读取&#xff0c;同时也可以使用其他软件通过MATLAB的应用程序接口来进行读写操作。如果只是再MATLAB环境中处理数据&#xff0c;使用…

【深度学习】树莓派Zero w深度学习模型Python推理

在机器学习开发过程中&#xff0c;当模型训练好后&#xff0c;接下来就要进行模型推理了&#xff0c;根据部署环境可分为三类场景&#xff1a; 边缘计算&#xff1a;一般指手机&#xff0c;嵌入式设备&#xff0c;直接在数据生成的设备上进行推理&#xff0c;因为能避免将采集…

SkyWalking安装部署

一、概念 1、什么是 APM 系统&#xff1f; APM&#xff08;Application Performance Management&#xff09;即应用性能管理系统&#xff0c;是对企业系统即时监控以实现对应用程序性能管理和故障管理的系统化的解决方案。应用性能管理&#xff0c;主要指对企业的关键业务应用…

不使用辅助变量的前提下实现两个变量的交换

package operator; //不用第三个辅助变量&#xff0c;实现两个数的交换 public class Demo08 {public static void change(int a, int b){a ab;b a-b;a a-b;System.out.println(a);System.out.println(b);}public static void main(String[] args) {change(900,3000);} }后续…

UMA 2 - Unity Multipurpose Avatar☀️七.UMA API介绍 : 基本API与保存加载配置

文章目录 🟥 UMA Data DNA参数引用位置🟥 UMA API介绍🟥 UMA Data DNA参数引用位置 我们想通过代码去控制如图所示参数,达到捏脸的目的.下面就是可以控制的代码: _dna["headSize"].Set(1); _avatar.BuildCharacter();我们观察发现操控代码类似Material去设置…

三分钟学会一个新技能——使用Java操作Redis

目录 1、前置准备操作 1.1、为什么要进行前置准备操作 1.2、本地如何访问到云服务上Redis的6379端口号 1.3、配置步骤&#xff1a; 2、配置后本地主机如何操作 3、常用命令举例 3.1、通用命令举例 3.2、string相关命令举例 3.3、hash相关命令举例 3.4、list相关命令…

Unity下如何实现RTMP或RTSP播放端录像?

好多开发者问我们&#xff0c;Unity环境下&#xff0c;除了RTSP或RTMP的播放&#xff0c;如果有录像诉求&#xff0c;怎么实现&#xff1f;实际上录像相对播放来说&#xff0c;更简单一些&#xff0c;因为不涉及到绘制&#xff0c;只要拉流下来数据&#xff0c;直接写mp4文件就…

云原生Kubernetes:pod资源管理与配置

目录 一、理论 1.pod 2.pod容器分类 3.镜像拉取策略 4.pod 的重启策略 二、实验 1.Pod容器的分类 2.镜像拉取策略 三、问题 1.apiVersion 报错 2.pod v1版本资源未注册 3.格式错误 4.取行显示指定pod信息 四、总结 一、理论 1.pod (1) 概念 Pod是kubernetes中…

【LeetCode-中等题】78. 子集

文章目录 组合并集问题汇总&#xff1a;题目方法一&#xff1a;动态规划方法二&#xff1a;递归加回溯(关键----startIndex) 组合并集问题汇总&#xff1a; 1、子集去重版本 2、组合非去重版本 3、组合去重版本 题目 注意&#xff1a;这里的nums数组里面的元素是各不相同的&a…

(文末赠书)我为什么推荐应该人手一本《人月神话》

能点进来的朋友&#xff0c;说明你肯定是计算机工作的朋友或者对这本书正在仔细琢磨着的朋友。 文章目录 1、人人都会编程的时代&#xff0c;我们如何留存?2、小故事说明项目管理着为什么必看这本书3、如何评价《人月神话&#xff1a;纪念典藏版》4、本书的目录&#xff08;好…

【操作系统】进程的通信IPC

进程通信是指进程之间的信息交换。 低级通信方式&#xff1a;PV操作 高级通信方式&#xff1a;1.共享存储2.消息传递3.管道通信 共享存储 低级数据结构共享&#xff0c;高级存储区共享。 对共享空间进行读写操作时&#xff0c;需要用到互斥工具。 消息传递 利用发送消息和…

如何选择安全稳定的大文件传输软件平台,企业传输必看

在当今的信息时代&#xff0c;大文件传输是企业间合作、项目交付、数据备份等场景中不可或缺的需求。然而&#xff0c;大文件传输也面临着诸多挑战&#xff0c;比如速度慢、不稳定、不安全等&#xff0c;给企业带来了不少困扰和风险。那么&#xff0c;如何选择一款安全稳定的大…

自然语言处理应用(一):情感分析

情感分析 随着在线社交媒体和评论平台的快速发展&#xff0c;大量评论的数据被记录下来。这些数据具有支持决策过程的巨大潜力。 情感分析&#xff08;sentiment analysis&#xff09;研究人们在文本中 &#xff08;如产品评论、博客评论和论坛讨论等&#xff09;“隐藏”的情…

echarts静态饼图

<div class"cake"><div id"cakeChart"></div></div> import * as echarts from "echarts";mounted() {this.$nextTick(() > {this.getCakeEcharts()})},methods: {// 饼状图getCakeEcharts() {let cakeChart echart…

UE5 Foliage地形植被实例删不掉选不中问题

目前问题测试发生在5.2.1上 地形上先填充后刷的植被删不掉 首先这个就是bug&#xff0c;大概看到说是5.3上能解决了&#xff0c;对此我只能吐槽ue5上地形植被bug太多了 什么nanite还能产生bug&#xff0c;不过这次又不是&#xff0c;整个删掉instance可以删除所有植被&#…

uniapp微信小程序《隐私保护协议》弹窗处理流程

背景 《关于小程序隐私保护指引设置的公告》 《小程序隐私协议开发指南》 流程 1.第一步 必须设置且审核通过&#xff01;&#xff01;&#xff01; 2.第二步 uniapp在manifest.json中添加&#xff01;&#xff01;&#xff01; /* 在 2023年9月15号之前&#xff0c;在 ap…

Leetcode算法入门与数组丨3. 数组基础

文章目录 前言1 数组简介2 数组的基本操作2.1 访问元素2.2 查找元素2.3 插入元素2.4 改变元素2.5 删除元素 3 总结task03task04 前言 Datawhale组队学习丨9月Leetcode算法入门与数组丨打卡笔记 这篇博客是一个 入门型 的文章&#xff0c;主要是自己学习的一个记录。 内容会参…

SSM SpringBoot vue快递柜管理系统

SSM SpringBoot vue快递柜管理系统 系统功能 登录 注册 个人中心 快递员管理 用户信息管理 用户寄件管理 配送信息管理 寄存信息管理 开发环境和技术 开发语言&#xff1a;Java 使用框架: SSM(Spring SpringMVC Mybaits)或SpringBoot 前端: vue 数据库&#xff1a;Mys…

【GAMES202】Real-Time Ray Tracing 1—实时光线追踪1

一、前言 这篇我们开始新的话题—Real-Time Ray Tracing简称RTRT&#xff0c;也就是实时光线追踪&#xff0c;关于光线追踪&#xff0c;我们已经不止一次提到过它的优点&#xff0c;无论是软阴影还是全局光照&#xff0c;光线追踪都很容易做&#xff0c;唯一的缺点就是速度太慢…