文章目录
- 补充:HTTP协议
- MQTT协议
- MQTT的核心特性
- MQTT vs HTTP:关键对比
- EMQX
- 项目集成EMQX
- 集成配置
- 客户端和回调方法
- 具体接口和方法处理
- 处理类
补充:HTTP协议
- HTTP是一种应用层协议,使用TCP作为传输层协议,默认端口是80,基于请求和响应的方式,即客户端发起请求,服务器响应请求并返回数据(HTML,JSON)。在HTTP/1.1中,使用了长连接技术,允许一个连接复用多个请求和响应,减少了TCP三次握手的消耗。
- HTTP的基本结构
- **请求行:**包含请求方法(GET, POST等)、请求URL、协议版本。
- **请求头:**包括各种元数据,如Connection、Host、Content-Type等。
- **空行:**标识头部与载荷的分界线
- **请求体:**通常在POST请求中出现,包含请求的具体数据。
- HTTP的**无状态性:**HTTP是无状态协议,每次请求都是独立的,不会记录上一次请求的任何信息,如果需要记录用户状态,需要额外机制,如:**Cookies:**浏览器在发送请求时,可以携带上次访问时服务器存储的Cookies(小型文本数据),服务器通过这些Cookies来识别用户的身份或维持会话状态。
- **高开销:**每次请求都需要建立TCP连接,导致网络开销较大,尤其在频繁请求的场景下。
- 实时性差:HTTP通常是客户端主动发起请求,服务器无法主动推送数据。
MQTT协议
- MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息传输协议,专为低带宽、高延迟或不稳定的网络环境设计。使用TCP协议进行传输,端口为1883(非加密)和8883(加密),客户端通过发布(Publish)消息到某个主题(Topic),而其他订阅(Subscribe)该主题的客户端会接收到消息。现已成为物联网(IoT)领域最流行的通信协议之一。
- **主题(Topic):**消息的标签,决定消息的去向,订阅者根据主题来接收消息。
- **QoS(Quality of Service)级别:**决定消息传输的可靠性。MQTT支持三个级别的QoS:
- QoS 0:最多一次发送,不保证消息送达。
- QoS 1:至少一次发送,确保消息至少送达一次。
- QoS 2:只有一次发送,确保消息只送达一次。
- **保留标志:**用于确保客户端在订阅时能接收到最后一条消息。
MQTT基于客户端-服务器架构,其中:
- 发布者(Publisher):发送消息的客户端
- 订阅者(Subscriber):接收消息的客户端
- 代理(Broker):接收所有消息并过滤后分发给相关订阅者的服务器
MQTT的核心特性
- 轻量高效:最小化协议开销,报文头仅2字节
- 发布/订阅模式:解耦消息生产者和消费者
- 三种服务质量(QoS)等级:
- QoS 0:最多一次(可能丢失)
- QoS 1:至少一次(可能重复)
- QoS 2:恰好一次(确保可靠)
- 持久会话:可恢复中断的连接
- 遗嘱消息:客户端异常断开时发送预设消息
- 主题过滤:支持多级通配符(#和+)
MQTT vs HTTP:关键对比
特性 | MQTT | HTTP |
---|---|---|
通信模式 | 发布/订阅 | 请求/响应 |
连接开销 | 保持长连接(Keep-Alive) | 通常短连接(可配置Keep-Alive) |
消息方向 | 双向通信 | 客户端发起请求 |
协议开销 | 极小(最小2字节头) | 较大(包含大量头信息) |
实时性 | 高(消息即时推送) | 低(依赖轮询或WebSocket) |
适用场景 | IoT、实时消息、低带宽环境 | Web服务、API交互 |
消息推送 | 服务器可主动推送 | 传统HTTP需客户端轮询 |
功耗 | 低 | 相对较高 |
安全性 | 支持TLS加密 | 支持HTTPS加密 |
EMQX
-
EMQX 是一款大规模可弹性伸缩的云原生分布式物联网 MQTT 消息服务器。作为全球最具扩展性的 MQTT 消息服务器,EMQX 提供了高效可靠海量物联网设备连接,能够高性能实时移动与处理消息和事件流数据,帮助您快速构建关键业务的物联网平台与应用。
-
EMQX文档
-
EMQX的docker安装:开始在linux上安装1Panel,然后再应用商店中进行一键安装。
-
EMQX特性:
- 开放源码:基于 Apache 2.0 许可证完全开源,自 2013 年起 200+ 开源版本迭代。
- MQTT 5.0:100% 支持 MQTT 5.0 和 3.x 协议标准,更好的伸缩性、安全性和可靠性。
- 海量连接:单节点支持 500 万 MQTT 设备连接,集群可扩展至 1 亿并发 MQTT 连接。
- 高性能:单节点支持每秒实时接收、移动、处理与分发数百万条的 MQTT 消息。
- 低时延:基于 Erlang/OTP 软实时的运行时系统设计,消息分发与投递时延低于 1 毫秒。
- 高可用:采用 Masterless 的大规模分布式集群架构,实现系统高可用和水平扩展。
- 根据业务流程图可以看出,系统与柜机交互是通过MQTT协议进行
项目集成EMQX
集成配置
- 引入依赖
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
- MqttTest
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class MqttTest {public static void main(String[] args) {String subTopic = "testtopic/#";String pubTopic = "testtopic/1";String content = "Hello World";int qos = 2;String broker = "tcp://ip:1883";String clientId = "emqx_test";MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(broker, clientId, persistence);// MQTT 连接选项MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setUserName("emqx_test");connOpts.setPassword("emqx_test_password".toCharArray());// 保留会话connOpts.setCleanSession(true);// 设置回调client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable cause) {// 连接丢失后,一般在这里面进行重连System.out.println("连接断开,可以做重连");}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {// subscribe后得到的消息会执行到这里面System.out.println("接收消息主题:" + topic);System.out.println("接收消息Qos:" + message.getQos());System.out.println("接收消息内容:" + new String(message.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + token.isComplete());}});// 建立连接System.out.println("Connecting to broker: " + broker);client.connect(connOpts);System.out.println("Connected");System.out.println("Publishing message: " + content);// 订阅client.subscribe(subTopic);// 消息发布所需参数MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);client.publish(pubTopic, message);System.out.println("Message published");client.disconnect();System.out.println("Disconnected");client.close();System.exit(0);} catch (MqttException me) {System.out.println("reason " + me.getReasonCode());System.out.println("msg " + me.getMessage());System.out.println("loc " + me.getLocalizedMessage());System.out.println("cause " + me.getCause());System.out.println("excep " + me);me.printStackTrace();}}
}
- 配置yaml文件
emqx:client:clientId: xt001username: xxxpassword: xxxserverURI: tcp://ip:1883keepAliveInterval: 10connectionTimeout: 30
- Emqx配置对象类(EmqxProperties)
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;@Data
@Component
@ConfigurationProperties(prefix = "emqx.client")
public class EmqxProperties {private String clientId;private String username;private String password;private String serverURI;private int keepAliveInterval;private int connectionTimeout;
}
- Emqx常量(EmqxConstants)
/*** Emqx常量信息**/
public class EmqxConstants {/** 充电宝插入,柜机发布Topic消息, 服务器监听消息 */public final static String TOPIC_POWERBANK_CONNECTED = "/sys/powerBank/connected";/** 用户扫码,服务器发布Topic消息 柜机监听消息 */public final static String TOPIC_SCAN_SUBMIT = "/sys/scan/submit/%s";/** 充电宝弹出,柜机发布Topic消息,服务器监听消息 */public final static String TOPIC_POWERBANK_UNLOCK = "/sys/powerBank/unlock";/** 柜机属性上报,服务器监听消息 */public final static String TOPIC_PROPERTY_POST = "/sys/property/post";
}
客户端和回调方法
- EmqxClientWrapper
import com.share.device.emqx.callback.OnMessageCallback;
import com.share.device.emqx.config.EmqxProperties;
import com.share.device.emqx.constant.EmqxConstants;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class EmqxClientWrapper {@Autowiredprivate EmqxProperties emqxProperties;@Autowiredprivate MqttClient client;@Autowiredprivate OnMessageCallback onMessageCallback;@PostConstructprivate void init() {MqttClientPersistence mqttClientPersistence = new MemoryPersistence();try {//新建客户端 参数:MQTT服务的地址,客户端名称,持久化client = new MqttClient(emqxProperties.getServerURI(), emqxProperties.getClientId(), mqttClientPersistence);// 设置回调client.setCallback(onMessageCallback);// 建立连接connect();} catch (MqttException e) {log.info("MqttClient创建失败");throw new RuntimeException(e);}}public Boolean connect() {// 设置连接的配置try {client.connect(mqttConnectOptions());log.info("连接成功");// 订阅String[] topics = {EmqxConstants.TOPIC_POWERBANK_CONNECTED, EmqxConstants.TOPIC_POWERBANK_UNLOCK, EmqxConstants.TOPIC_PROPERTY_POST};client.subscribe(topics);return true;} catch (MqttException e) {log.info("连接失败");e.printStackTrace();}return false;}/*创建MQTT配置类*/private MqttConnectOptions mqttConnectOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setUserName(emqxProperties.getUsername());options.setPassword(emqxProperties.getPassword().toCharArray());options.setAutomaticReconnect(true);//是否自动重新连接options.setCleanSession(true);//是否清除之前的连接信息options.setConnectionTimeout(emqxProperties.getConnectionTimeout());//连接超时时间options.setKeepAliveInterval(emqxProperties.getKeepAliveInterval());//心跳return options;}/*** 发布消息* @param topic* @param data*/public void publish(String topic, String data) {try {MqttMessage message = new MqttMessage(data.getBytes());message.setQos(2);client.publish(topic, message);} catch (MqttException e) {log.info("消息发布失败");e.printStackTrace();}}}
- 回调消息处理类 :OnMessageCallback
import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.stereotype.Component;@Slf4j @Component public class OnMessageCallback implements MqttCallback {@Overridepublic void connectionLost(Throwable cause) {// 连接丢失后,一般在这里面进行重连System.out.println("连接断开,可以做重连");}@Override public void messageArrived(String topic, MqttMessage message) {// subscribe后得到的消息会执行到这里面System.out.println("接收消息主题:" + topic);System.out.println("接收消息Qos:" + message.getQos());System.out.println("接收消息内容:" + new String(message.getPayload()));try {// 根据主题选择不同的处理逻辑MassageHandler massageHandler = messageHandlerFactory.getMassageHandler(topic);if(null != massageHandler) {String content = new String(message.getPayload());massageHandler.handleMessage(JSONObject.parseObject(content));}} catch (Exception e) {e.printStackTrace();log.error("mqtt消息异常:{}", new String(message.getPayload()));} }@Override public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + token.isComplete()); } }
具体接口和方法处理
- 定义策略接口:MassageHandler
public interface MassageHandler {/*** 策略接口* @param message*/void handleMessage(JSONObject message);
}
- 具体Handler处理
import java.lang.annotation.*;
// 自定义注解
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface GuiguEmqx {String topic();
}
- 充电宝插入处理类:PowerBankConnectedHandler
@Slf4j
@Component
@GuiguEmqx(topic = EmqxConstants.TOPIC_POWERBANK_CONNECTED)
public class PowerBankConnectedHandler implements MassageHandler {@Overridepublic void handleMessage(JSONObject message) {log.info("handleMessage: {}", message.toJSONString());}
}
- 充电宝弹出处理类:PowerBankUnlockHandler
@Slf4j
@Component
@GuiguEmqx(topic = EmqxConstants.TOPIC_POWERBANK_UNLOCK)
public class PowerBankUnlockHandler implements MassageHandler {@Overridepublic void handleMessage(JSONObject message) {log.info("handleMessage: {}", message.toJSONString());}
}
- 属性上报:PropertyPostHandler
@Slf4j
@Component
@GuiguEmqx(topic = EmqxConstants.TOPIC_PROPERTY_POST)
public class PropertyPostHandler implements MassageHandler {@Overridepublic void handleMessage(JSONObject message) {log.info("handleMessage: {}", message.toJSONString());}
}
处理类
- MessageHandlerFactory
public interface MessageHandlerFactory {MassageHandler getMassageHandler(String topic);
}
- MessageHandlerFactoryImpl
@Service
public class MessageHandlerFactoryImpl implements MessageHandlerFactory, ApplicationContextAware {private Map<String, MassageHandler> handlerMap = new HashMap<>();/*** 初始化bean对象* @param ioc*/@Overridepublic void setApplicationContext(ApplicationContext ioc) {// 获取对象Map<String, MassageHandler> beanMap = ioc.getBeansOfType(MassageHandler.class);for (MassageHandler massageHandler : beanMap.values()) {GuiguEmqx guiguEmqx = AnnotatedElementUtils.findAllMergedAnnotations(massageHandler.getClass(), GuiguEmqx.class).iterator().next();if (null != guiguEmqx) {String topic = guiguEmqx.topic();// 初始化到maphandlerMap.put(topic, massageHandler);}}}@Overridepublic MassageHandler getMassageHandler(String topic) {return handlerMap.get(topic);}
}