充电宝项目中的MQTT(轻量高效的物联网通信协议)

文章目录

  • 补充:HTTP协议
  • MQTT协议
    • MQTT的核心特性
    • MQTT vs HTTP:关键对比
  • EMQX
  • 项目集成EMQX
    • 集成配置
    • 客户端和回调方法
    • 具体接口和方法处理
    • 处理类

补充:HTTP协议

  • HTTP是一种应用层协议,使用TCP作为传输层协议,默认端口是80,基于请求和响应的方式,即客户端发起请求,服务器响应请求并返回数据(HTML,JSON)。在HTTP/1.1中,使用了长连接技术,允许一个连接复用多个请求和响应,减少了TCP三次握手的消耗。
  • HTTP的基本结构
    • **请求行:**包含请求方法(GET, POST等)、请求URL、协议版本。
    • **请求头:**包括各种元数据,如Connection、Host、Content-Type等。
    • **空行:**标识头部与载荷的分界线
    • **请求体:**通常在POST请求中出现,包含请求的具体数据。

  • HTTP的**无状态性:**HTTP是无状态协议,每次请求都是独立的,不会记录上一次请求的任何信息,如果需要记录用户状态,需要额外机制,如:**Cookies:**浏览器在发送请求时,可以携带上次访问时服务器存储的Cookies(小型文本数据),服务器通过这些Cookies来识别用户的身份或维持会话状态。
  • **高开销:**每次请求都需要建立TCP连接,导致网络开销较大,尤其在频繁请求的场景下。
  • 实时性差:HTTP通常是客户端主动发起请求,服务器无法主动推送数据。

MQTT协议

  • MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息传输协议,专为低带宽、高延迟或不稳定的网络环境设计。使用TCP协议进行传输,端口为1883(非加密)和8883(加密),客户端通过发布(Publish)消息到某个主题(Topic),而其他订阅(Subscribe)该主题的客户端会接收到消息。现已成为物联网(IoT)领域最流行的通信协议之一。

  • **主题(Topic):**消息的标签,决定消息的去向,订阅者根据主题来接收消息。
  • **QoS(Quality of Service)级别:**决定消息传输的可靠性。MQTT支持三个级别的QoS:
    • QoS 0:最多一次发送,不保证消息送达。
    • QoS 1:至少一次发送,确保消息至少送达一次。
    • QoS 2:只有一次发送,确保消息只送达一次。
  • **保留标志:**用于确保客户端在订阅时能接收到最后一条消息。

MQTT基于客户端-服务器架构,其中:

  • 发布者(Publisher):发送消息的客户端
  • 订阅者(Subscriber):接收消息的客户端
  • 代理(Broker):接收所有消息并过滤后分发给相关订阅者的服务器

MQTT的核心特性

  1. 轻量高效:最小化协议开销,报文头仅2字节
  2. 发布/订阅模式:解耦消息生产者和消费者
  3. 三种服务质量(QoS)等级
    • QoS 0:最多一次(可能丢失)
    • QoS 1:至少一次(可能重复)
    • QoS 2:恰好一次(确保可靠)
  4. 持久会话:可恢复中断的连接
  5. 遗嘱消息:客户端异常断开时发送预设消息
  6. 主题过滤:支持多级通配符(#和+)

MQTT vs HTTP:关键对比

特性MQTTHTTP
通信模式发布/订阅请求/响应
连接开销保持长连接(Keep-Alive)通常短连接(可配置Keep-Alive)
消息方向双向通信客户端发起请求
协议开销极小(最小2字节头)较大(包含大量头信息)
实时性高(消息即时推送)低(依赖轮询或WebSocket)
适用场景IoT、实时消息、低带宽环境Web服务、API交互
消息推送服务器可主动推送传统HTTP需客户端轮询
功耗相对较高
安全性支持TLS加密支持HTTPS加密

EMQX

  • EMQX 是一款大规模可弹性伸缩的云原生分布式物联网 MQTT 消息服务器。作为全球最具扩展性的 MQTT 消息服务器,EMQX 提供了高效可靠海量物联网设备连接,能够高性能实时移动与处理消息和事件流数据,帮助您快速构建关键业务的物联网平台与应用。

  • EMQX文档

  • EMQX的docker安装:开始在linux上安装1Panel,然后再应用商店中进行一键安装。
    在这里插入图片描述

  • EMQX特性:

    • 开放源码:基于 Apache 2.0 许可证完全开源,自 2013 年起 200+ 开源版本迭代。
    • MQTT 5.0:100% 支持 MQTT 5.0 和 3.x 协议标准,更好的伸缩性、安全性和可靠性。
    • 海量连接:单节点支持 500 万 MQTT 设备连接,集群可扩展至 1 亿并发 MQTT 连接。
    • 高性能:单节点支持每秒实时接收、移动、处理与分发数百万条的 MQTT 消息。
    • 低时延:基于 Erlang/OTP 软实时的运行时系统设计,消息分发与投递时延低于 1 毫秒。
    • 高可用:采用 Masterless 的大规模分布式集群架构,实现系统高可用和水平扩展。

  • 根据业务流程图可以看出,系统与柜机交互是通过MQTT协议进行
    在这里插入图片描述

项目集成EMQX

集成配置

  1. 引入依赖
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
  1. MqttTest
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class MqttTest {public static void main(String[] args) {String subTopic = "testtopic/#";String pubTopic = "testtopic/1";String content = "Hello World";int qos = 2;String broker = "tcp://ip:1883";String clientId = "emqx_test";MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(broker, clientId, persistence);// MQTT 连接选项MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setUserName("emqx_test");connOpts.setPassword("emqx_test_password".toCharArray());// 保留会话connOpts.setCleanSession(true);// 设置回调client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable cause) {// 连接丢失后,一般在这里面进行重连System.out.println("连接断开,可以做重连");}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {// subscribe后得到的消息会执行到这里面System.out.println("接收消息主题:" + topic);System.out.println("接收消息Qos:" + message.getQos());System.out.println("接收消息内容:" + new String(message.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + token.isComplete());}});// 建立连接System.out.println("Connecting to broker: " + broker);client.connect(connOpts);System.out.println("Connected");System.out.println("Publishing message: " + content);// 订阅client.subscribe(subTopic);// 消息发布所需参数MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);client.publish(pubTopic, message);System.out.println("Message published");client.disconnect();System.out.println("Disconnected");client.close();System.exit(0);} catch (MqttException me) {System.out.println("reason " + me.getReasonCode());System.out.println("msg " + me.getMessage());System.out.println("loc " + me.getLocalizedMessage());System.out.println("cause " + me.getCause());System.out.println("excep " + me);me.printStackTrace();}}
}
  1. 配置yaml文件
emqx:client:clientId: xt001username: xxxpassword: xxxserverURI: tcp://ip:1883keepAliveInterval: 10connectionTimeout: 30
  1. Emqx配置对象类(EmqxProperties)
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;@Data
@Component
@ConfigurationProperties(prefix = "emqx.client")
public class EmqxProperties {private String clientId;private String username;private String password;private String serverURI;private int keepAliveInterval;private int connectionTimeout;
}
  1. Emqx常量(EmqxConstants)
/*** Emqx常量信息**/
public class EmqxConstants {/** 充电宝插入,柜机发布Topic消息, 服务器监听消息 */public final static String TOPIC_POWERBANK_CONNECTED = "/sys/powerBank/connected";/** 用户扫码,服务器发布Topic消息 柜机监听消息  */public final static String TOPIC_SCAN_SUBMIT = "/sys/scan/submit/%s";/** 充电宝弹出,柜机发布Topic消息,服务器监听消息  */public final static String TOPIC_POWERBANK_UNLOCK = "/sys/powerBank/unlock";/** 柜机属性上报,服务器监听消息  */public final static String TOPIC_PROPERTY_POST = "/sys/property/post";
}

客户端和回调方法

  1. EmqxClientWrapper
import com.share.device.emqx.callback.OnMessageCallback;
import com.share.device.emqx.config.EmqxProperties;
import com.share.device.emqx.constant.EmqxConstants;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class EmqxClientWrapper {@Autowiredprivate EmqxProperties emqxProperties;@Autowiredprivate MqttClient client;@Autowiredprivate OnMessageCallback onMessageCallback;@PostConstructprivate void init() {MqttClientPersistence mqttClientPersistence = new MemoryPersistence();try {//新建客户端 参数:MQTT服务的地址,客户端名称,持久化client = new MqttClient(emqxProperties.getServerURI(), emqxProperties.getClientId(), mqttClientPersistence);// 设置回调client.setCallback(onMessageCallback);// 建立连接connect();} catch (MqttException e) {log.info("MqttClient创建失败");throw new RuntimeException(e);}}public Boolean connect() {// 设置连接的配置try {client.connect(mqttConnectOptions());log.info("连接成功");// 订阅String[] topics = {EmqxConstants.TOPIC_POWERBANK_CONNECTED, EmqxConstants.TOPIC_POWERBANK_UNLOCK, EmqxConstants.TOPIC_PROPERTY_POST};client.subscribe(topics);return true;} catch (MqttException e) {log.info("连接失败");e.printStackTrace();}return false;}/*创建MQTT配置类*/private MqttConnectOptions mqttConnectOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setUserName(emqxProperties.getUsername());options.setPassword(emqxProperties.getPassword().toCharArray());options.setAutomaticReconnect(true);//是否自动重新连接options.setCleanSession(true);//是否清除之前的连接信息options.setConnectionTimeout(emqxProperties.getConnectionTimeout());//连接超时时间options.setKeepAliveInterval(emqxProperties.getKeepAliveInterval());//心跳return options;}/*** 发布消息* @param topic* @param data*/public void publish(String topic, String data) {try {MqttMessage message = new MqttMessage(data.getBytes());message.setQos(2);client.publish(topic, message);} catch (MqttException e) {log.info("消息发布失败");e.printStackTrace();}}}
  1. 回调消息处理类 :OnMessageCallback
    import lombok.extern.slf4j.Slf4j;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.springframework.stereotype.Component;@Slf4j
    @Component
    public class OnMessageCallback implements MqttCallback {@Overridepublic void connectionLost(Throwable cause) {// 连接丢失后,一般在这里面进行重连System.out.println("连接断开,可以做重连");}@Override
    public void messageArrived(String topic, MqttMessage message) {// subscribe后得到的消息会执行到这里面System.out.println("接收消息主题:" + topic);System.out.println("接收消息Qos:" + message.getQos());System.out.println("接收消息内容:" + new String(message.getPayload()));try {// 根据主题选择不同的处理逻辑MassageHandler massageHandler = messageHandlerFactory.getMassageHandler(topic);if(null != massageHandler) {String content = new String(message.getPayload());massageHandler.handleMessage(JSONObject.parseObject(content));}} catch (Exception e) {e.printStackTrace();log.error("mqtt消息异常:{}", new String(message.getPayload()));}
    }@Override
    public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + token.isComplete());
    }
    }
    

具体接口和方法处理

  1. 定义策略接口:MassageHandler
public interface MassageHandler {/*** 策略接口* @param message*/void handleMessage(JSONObject message);
}
  1. 具体Handler处理
import java.lang.annotation.*;
// 自定义注解
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface GuiguEmqx {String topic();
}
  1. 充电宝插入处理类:PowerBankConnectedHandler
@Slf4j
@Component
@GuiguEmqx(topic = EmqxConstants.TOPIC_POWERBANK_CONNECTED)
public class PowerBankConnectedHandler implements MassageHandler {@Overridepublic void handleMessage(JSONObject message) {log.info("handleMessage: {}", message.toJSONString());}
}
  1. 充电宝弹出处理类:PowerBankUnlockHandler
@Slf4j
@Component
@GuiguEmqx(topic = EmqxConstants.TOPIC_POWERBANK_UNLOCK)
public class PowerBankUnlockHandler implements MassageHandler {@Overridepublic void handleMessage(JSONObject message) {log.info("handleMessage: {}", message.toJSONString());}
}
  1. 属性上报:PropertyPostHandler
@Slf4j
@Component
@GuiguEmqx(topic = EmqxConstants.TOPIC_PROPERTY_POST)
public class PropertyPostHandler implements MassageHandler {@Overridepublic void handleMessage(JSONObject message) {log.info("handleMessage: {}", message.toJSONString());}
}

处理类

  1. MessageHandlerFactory
public interface MessageHandlerFactory {MassageHandler getMassageHandler(String topic);
}
  1. MessageHandlerFactoryImpl
@Service
public class MessageHandlerFactoryImpl implements MessageHandlerFactory, ApplicationContextAware {private Map<String, MassageHandler> handlerMap = new HashMap<>();/*** 初始化bean对象* @param ioc*/@Overridepublic void setApplicationContext(ApplicationContext ioc) {// 获取对象Map<String, MassageHandler> beanMap = ioc.getBeansOfType(MassageHandler.class);for (MassageHandler massageHandler : beanMap.values()) {GuiguEmqx guiguEmqx = AnnotatedElementUtils.findAllMergedAnnotations(massageHandler.getClass(), GuiguEmqx.class).iterator().next();if (null != guiguEmqx) {String topic = guiguEmqx.topic();// 初始化到maphandlerMap.put(topic, massageHandler);}}}@Overridepublic MassageHandler getMassageHandler(String topic) {return handlerMap.get(topic);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/79270.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【iOS】UIPageViewController学习

UIPageViewController学习 前言创建一个UIPageViewController最简单的使用 UIPageViewController的方法说明&#xff1a;效果展示 UIPageViewController的协议方法 前言 笔者最近在写项目时想实现一个翻书效果&#xff0c;上网学习到了UIPageViewController今天写本篇博客总结…

Linux搭建环境:从零开始掌握基础操作(四)

​ ​ 您好&#xff0c;我是程序员小羊&#xff01; 前言 软件测试第一步就是搭建测试环境&#xff0c;如何搭建好测试环境&#xff0c;需要具备两项的基础知识&#xff1a; 1、Linux 命令: 软件测试第一个任务, 一般都需要进行环境搭建, 一部分&#xff0c;环境搭建内容是在服…

一天一个java知识点----Tomcat与Servlet

认识BS架构 静态资源&#xff1a;服务器上存储的不会改变的数据&#xff0c;通常不会根据用户的请求而变化。比如&#xff1a;HTML、CSS、JS、图片、视频等(负责页面展示) 动态资源&#xff1a;服务器端根据用户请求和其他数据动态生成的&#xff0c;内容可能会在每次请求时都…

YOLOV8 OBB 海思3516训练流程

YOLOV8 OBB 海思3516训练流程 目录 1、 下载带GPU版本的torch(可选) 1 2、 安装 ultralytics 2 3、 下载pycharm 社区版 2 4、安装pycharm 3 5、新建pycharm 工程 3 6、 添加conda 环境 4 7、 训练代码 5 9、配置Ymal 文件 6 10、修改网络结构 9 11、运行train.py 开始训练模…

【深度学习】花书第18章——配分函数

直面配分函数 许多概率模型&#xff08;通常是无向图模型&#xff09;由一个未归一化的概率分布 p ~ ( x , θ ) \tilde p(\mathbf x,\theta) p~​(x,θ)定义。我们必须通过除以配分函数 Z ( θ ) Z(\pmb{ \theta}) Z(θ)来归一化 p ~ \tilde p p~​。以获得一个有效的概率分…

工作记录1

日常总结、灵感记录、学习要点。持续记录 学海无涯,再好的记性也比不过烂笔头,记录一下学习日常、灵感、要点。 前言:最近看见一个博文,很有感触,是某个大佬自己运营的网站,分享了他的各种经验文章和自身的一些笔记。本人还没有他这么屌,所以还是先在CSDN上小试牛刀吧…

Spring Boot(二十一):RedisTemplate的String和Hash类型操作

RedisTemplate和StringRedisTemplate的系列文章详见&#xff1a; Spring Boot&#xff08;十七&#xff09;&#xff1a;集成和使用Redis Spring Boot&#xff08;十八&#xff09;&#xff1a;RedisTemplate和StringRedisTemplate Spring Boot&#xff08;十九&#xff09;…

智能指针之设计模式1

本文探讨一下智能指针和GOF设计模式的关系&#xff0c;如果按照设计模式的背后思想来分析&#xff0c;可以发现围绕智能指针的设计和实现有设计模式的一些思想体现。当然&#xff0c;它们也不是严格意义上面向对象的设计模式&#xff0c;毕竟它们没有那么分明的类层次体系&…

中间件--ClickHouse-1--基础介绍(列式存储,MPP架构,分布式计算,SQL支持,向量化执行,亿万级数据秒级查询)

1、概述 ClickHouse是一个用于联机分析(OLAP)的列式数据库管理系统(DBMS)。它由俄罗斯的互联网巨头Yandex为解决其内部数据分析需求而开发&#xff0c;并于2016年开源。专为大规模数据分析&#xff0c;实时数据分析和复杂查询设计&#xff0c;具有高性能、实时数据和可扩展性等…

Go之Slice和数组:深入理解底层设计与最佳实践

在Go语言中&#xff0c;数组&#xff08;Array&#xff09;和切片&#xff08;Slice&#xff09;是两种看似相似却本质不同的数据结构。本文将深入剖析它们的底层实现机制&#xff0c;并结合实际代码示例&#xff0c;帮助开发者掌握核心差异和使用场景。 一、基础概念&#xff…

力扣热题100——普通数组(不普通)

普通数组但一点不普通&#xff01; 最大子数组和合并区间轮转数组除自身以外数组的乘积缺失的第一个正数 最大子数组和 这道题是非常经典的适用动态规划解决题目&#xff0c;但同时这里给出两种解法 动态规划、分治法 那么动态规划方法大家可以在我的另外一篇博客总结中看到&am…

矩阵基础+矩阵转置+矩阵乘法+行列式与逆矩阵

GPU渲染过程 矩阵 什么是矩阵&#xff08;Matrix&#xff09; 向量 &#xff08;3&#xff0c;9&#xff0c;88&#xff09; 点乘&#xff1a;计算向量夹角 叉乘&#xff1a;计算两个向量构成平面的法向量。 矩阵 矩阵有3行&#xff0c;2列&#xff0c;所以表示为M32 获取固…

MySQL之text字段详细分类说明

在 MySQL 中&#xff0c;TEXT 是用来存储大量文本数据的数据类型。TEXT 类型可以存储非常长的字符串&#xff0c;比 VARCHAR 类型更适合存储大块的文本数据。TEXT 数据类型分为以下几个子类型&#xff0c;每个子类型用于存储不同大小范围的文本数据&#xff1a; TINYTEXT: 可以…

超详细!Android 面试题大汇总与深度解析

一、Java 与 Kotlin 基础 1. Java 的多态是如何实现的&#xff1f; 多态是指在 Java 中&#xff0c;同一个行为具有多个不同表现形式或形态的能力。它主要通过方法重载&#xff08;Overloading&#xff09;和方法重写&#xff08;Overriding&#xff09;来实现。 方法重载&a…

如何提高webrtc操作跟手时间,降低延迟

第一次做webrtc项目&#xff0c;操作延迟&#xff0c;一直是个问题&#xff0c;多次调试都不能达到理想效果。偶尔发现提高jitterBuffer时间可以解决此问题。关键代码 const _setJitter (values: number) > { const receives peerConnection.getReceivers();receives.f…

语音合成(TTS)从零搭建一个完整的TTS系统-第一节-效果演示

一、概述 语音合成又叫文字转语音&#xff08;TTS-text to speech &#xff09;&#xff0c;本专题我们记录从零搭建一个完整的语音合成系统&#xff0c;包括文本前端、声学模型和声码器&#xff0c;从模型训练到系统的工程化实现&#xff0c;模型可以部署在手机等嵌入式设备上…

实验三 I/O地址译码

一、实验目的 掌握I/O地址译码电路的工作原理。 二、实验电路 实验电路如图1所示&#xff0c;其中74LS74为D触发器&#xff0c;可直接使用实验台上数字电路实验区的D触发器&#xff0c;74LS138为地址译码器&#xff0c; Y0&#xff1a;280H&#xff5e;287H&…

Linux 使用Nginx搭建简易网站模块

网站需求&#xff1a; 一、基于域名[www.openlab.com](http://www.openlab.com)可以访问网站内容为 welcome to openlab ​ 二、给该公司创建三个子界面分别显示学生信息&#xff0c;教学资料和缴费网站&#xff0c;基于[www.openlab.com/student](http://www.openlab.com/stud…

MyBatis 如何使用

1. 环境准备 添加依赖&#xff08;Maven&#xff09; 在 pom.xml 中添加 MyBatis 和数据库驱动依赖&#xff1a; <dependencies><!-- MyBatis 核心库 --><dependency><groupId>org.mybatis</groupId><artifactId>mybatis</artifactId&g…

ArkTS组件的三个通用(通用事件、通用属性、通用手势)

文章目录 通用事件点击事件 onClick触摸事件 onTouch挂载、卸载事件拖拽事件按键事件 onKeyEvent焦点事件鼠标事件悬浮事件组件区域变化事件 onAreaChange组件尺寸变化事件组件可见区域变化事件组件快捷键事件自定义事件分发自定义事件拦截 通用属性尺寸设置位置设置布局约束边…