在Android中使用 MQTT 服务实现消息通信

1.摘要


MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种轻量级的、基于发布/订阅(Publish/Subscribe)模式的通信协议,最初由 IBM 在1999年开发。它设计用于在低带宽、不稳定的网络环境下进行通信,适用于物联网(IoT)和机器对机器(M2M)通信。

2.准备工作


在项目的 build.gradle 文件中添加 MQTT 相关的依赖库:

implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

添加权限:

    <uses-permission android:name="android.permission.WAKE_LOCK" /><uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" /><uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" /><uses-permission android:name="android.permission.READ_PHONE_STATE" /><uses-permission android:name="android.permission.READ_EXTERNAL_STORAGE" /><uses-permission android:name="android.permission.INTERNET" />

3.代码实现


class MqttService: Service() {private val TAG = "MqttService"private lateinit var mqttAndroidClient: MqttAndroidClientprivate lateinit var mqttConnectOptions: MqttConnectOptionsprivate val serverUri = "tcp://YOUR_BROKER_URI:PORT"// 修改为你的 MQTT 服务器地址private val clientId = "AndroidClient"// 修改为你的客户端 IDprivate val username = "YOUR_USERNAME"// 修改为你的用户名private val password = "YOUR_PASSWORD"// 修改为你的密码private val PUBLISH_TOPIC = "YOUR_PUBLISH_TOPIC"private lateinit var handler: Handlerprivate val retryInterval: Long = 5000 // 重试间隔,单位:毫秒(5秒)override fun onCreate() {super.onCreate()handler = Handler()initializeMqttClient()}// 初始化 MQTT 客户端private fun initializeMqttClient() {mqttAndroidClient = MqttAndroidClient(applicationContext, serverUri, clientId)mqttConnectOptions = MqttConnectOptions().apply {userName = username  // 用户名password = this@MqttService.password.toCharArray()  // 密码isAutomaticReconnect = true //是否自动尝试重新连接isCleanSession = false // 清除缓存connectionTimeout = 10  // 设置超时时间,单位:秒keepAliveInterval = 20  // 心跳包发送间隔,单位:秒//当客户端与 MQTT 代理建立连接时,客户端可以指定一个遗嘱消息。如果客户端在建立连接后因为某种原因(例如网络故障或客户端异常退出)非正常断开连接,MQTT 代理将会发布这条遗嘱消息给所有订阅了客户端所订阅主题的客户端。这样,其他订阅者就可以知道客户端已经离线了。//Topic:遗嘱消息要发布到的主题。//Payload:遗嘱消息的内容。//QoS(Quality of Service):遗嘱消息的传输质量要求。//Retained:指定是否将遗嘱消息保留在 MQTT 代理中,以便新订阅者在连接时立即接收到该消息。setWill(PUBLISH_TOPIC, "I am offline".toByteArray(), 1, false)//设置遗嘱消息}mqttAndroidClient.setCallback(object : MqttCallbackExtended {override fun connectComplete(reconnect: Boolean, serverURI: String?) {Log.d(TAG, "连接到: $serverURI")}override fun connectionLost(cause: Throwable?) {Log.d(TAG, "连接丢失: ${cause?.message}")}override fun messageArrived(topic: String?, message: MqttMessage?) {Log.d(TAG, "消息到达: $topic - ${message.toString()}")}//消息传递完成时的回调函数override fun deliveryComplete(token: IMqttDeliveryToken?) {Log.d(TAG, "Delivery complete")}})connectToMqttBroker()}// 连接到 MQTT 代理private fun connectToMqttBroker() {try {if (isNetworkConnected()) {mqttAndroidClient.connect(mqttConnectOptions, null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken?) {Log.d(TAG, "连接到MQTT代理")//订阅主题subscribeToTopic(PUBLISH_TOPIC)}override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {Log.e(TAG, "连接MQTT代理失败: ${exception?.message}")//因为设置了 isAutomaticReconnect 为 true 所以不需要手动重连操作,mqtt会自动重连}})} else {Log.d(TAG, "无网络连接")retryConnectToMqttBroker()}} catch (e: MqttException) {e.printStackTrace()}}// 重连机制private fun retryConnectToMqttBroker() {handler.postDelayed({Log.d(TAG, "重试连接到MQTT代理")connectToMqttBroker()}, retryInterval)}// 发布消息fun publishMessage(topic: String, message: String) {try {if (mqttAndroidClient.isConnected) {val mqttMessage = MqttMessage()mqttMessage.payload = message.toByteArray()mqttAndroidClient.publish(topic, mqttMessage)Log.d(TAG, "消息发布到主题: $topic")} else {Log.e(TAG, "客户端未连接,无法发布消息.")}} catch (e: MqttException) {e.printStackTrace()}}// 订阅主题fun subscribeToTopic(topic: String) {try {if (mqttAndroidClient.isConnected) {mqttAndroidClient.subscribe(topic, 1, null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken?) {Log.d(TAG, "订阅主题: $topic")}override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {Log.e(TAG, "订阅主题失败: $topic")}})} else {Log.e(TAG, "客户端未连接,无法订阅主题.")}} catch (e: MqttException) {e.printStackTrace()}}// 取消订阅主题fun unsubscribeFromTopic(topic: String) {try {if (mqttAndroidClient.isConnected) {mqttAndroidClient.unsubscribe(topic, null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken?) {Log.d(TAG, "取消订阅主题: $topic")}override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {Log.e(TAG, "未能取消订阅主题: $topic")}})} else {Log.e(TAG, "客户端未连接,无法取消订阅主题.")}} catch (e: MqttException) {e.printStackTrace()}}// 判断网络是否连接private fun isNetworkConnected(): Boolean {val connectivityManager = getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManagerval activeNetwork: NetworkInfo? = connectivityManager.activeNetworkInforeturn activeNetwork?.isConnected == true}override fun onBind(intent: Intent?): IBinder? {return null}override fun onDestroy() {super.onDestroy()disconnectClient()}// 断开 MQTT 客户端连接private fun disconnectClient() {try {if (mqttAndroidClient.isConnected) {mqttAndroidClient.disconnect()}} catch (e: MqttException) {e.printStackTrace()}}
}

4.开启服务


在AndroidManifest.xml中的application标签内注册服务

 <service android:name=".MqttService" /><service android:name="org.eclipse.paho.android.service.MqttService" />

5.功能点介绍


MQTT 客户端初始化:
  • 通过 MqttAndroidClient 和 MqttConnectOptions 初始化 MQTT 客户端。
  • 配置 MQTT 客户端的连接选项,包括用户名、密码、自动重连、清除会话、超时设置和心跳包发送间隔等。
  • 配置遗嘱消息,当客户端非正常断开时,MQTT 代理将发布该消息(setWill 方法)。
MQTT 客户端回调:
  • 实现 MqttCallbackExtended 接口,用于处理连接完成、连接丢失、消息到达以及消息传递完成时的回调。.
连接到 MQTT 代理:
  • 尝试连接到 MQTT 代理,如果网络连接正常则进行连接,并订阅指定的主题(PUBLISH_TOPIC)。
  • 如果连接失败且设置了自动重连选项,客户端会自动尝试重连。
  • 如果没有网络连接,则通过重试机制定时尝试重新连接。
消息发布:
  • 实现 publishMessage 方法,用于向指定主题发布消息。
订阅和取消订阅主题:
  • 实现 subscribeToTopic 方法,用于订阅指定的主题。
  • 实现 unsubscribeFromTopic 方法,用于取消订阅指定的主题。

6.其他介绍


MQTT中 QoS 级别的意义:

QoS 0:最多一次传递(At most once)

  • 也称为“至多一次”传递
  • 发布消息后,不会收到任何确认或回执。
  • MQTT 代理不会跟踪消息传递,也不会重新传递丢失的消息。
  • 此级别适用于那些可以容忍偶尔丢失消息的应用场景,例如实时数据流,传感器数据等。

QoS 1:至少一次传递(At least once)

  • 确保消息至少被传递一次。
  • 在发布消息后,发布者会收到一个 PUBACK(发布确认)消息作为回复。
  • 如果 PUBACK 丢失或未收到确认,则 MQTT 客户端会尝试重新发送消息,直到收到 PUBACK。
  • 此级别适用于对消息到达的顺序不是特别关心,但确保消息至少被传递一次的应用场景。

QoS 2:只有一次传递(Exactly once)

  • 提供最高的传递保证,确保每条消息只传递一次。
  • 在发布消息后,发布者会收到两个确认消息:PUBREC(发布接收)和 PUBREL(发布释放)。
  • MQTT 客户端会等待收到 PUBREL 消息后再发送 PUBCOMP(发布完成)消息作为最终确认。
  • 此级别适用于对消息传递的顺序和确保每条消息只传递一次的高度敏感的应用场景,如金融交易、命令和控制等。
MQTT断开连接((32109) - java.io.EOFException)

可以看我的这篇文章:MQTT断开连接((32109) - java.io.EOFException)

7.最后


MQTT在物联网开发中必不可少,掌握相关知识非常重要,此篇文章用来温故一下MQTT使用流程,知识点不多,代码已经封装的差不多了,方便本人及各位拿来即用 更多功能自行拓展。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/24479.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

产品创新管理:从模仿到引领,中国企业的创新之路

一、引言 在全球化竞争日益激烈的今天&#xff0c;科技创新已成为推动国家经济增长和社会进步的关键动力。中国自改革开放四十年来&#xff0c;在科技创新领域取得了举世瞩目的成就&#xff0c;从跟踪模仿到自主研发&#xff0c;再到自主创新、开放创新和协同创新并举&#xf…

【机器学习300问】110、什么是Lasso回归模型?

LASSO回归的全称是Least Absolute Shrinkage and Selection Operator&#xff0c;中文叫“最小绝对收缩和选择算子”&#xff0c;用一个比喻来初步感受一下它的作用&#xff1a; 想象你在整理一个杂乱无章的房间&#xff0c;里面堆满了各种物品&#xff08;代表众多的预测变量&…

【Vue】小兔鲜首页 - 拆分模块组件 - 局部注册

文章目录 一、分析二、局部注册 一、分析 小兔仙组件拆分示意图 开发思路 分析页面&#xff0c;按模块拆分组件&#xff0c;搭架子 (局部或全局注册) 根据设计图&#xff0c;编写组件 html 结构 css 样式 (已准备好) 拆分封装通用小组件 (局部或全局注册)&#xff0c;一般这…

人形机器人的应用场景

随着技术的不断进步和人工智能的快速发展&#xff0c;人形机器人逐渐走入人们的视野&#xff0c;并在各个领域中展现出广泛的应用潜力。本文将探讨人形机器人的主要应用场景&#xff0c;包括服务行业助手、教育领域应用、医疗保健助手、工业制造伙伴、家庭服务助手、军事领域执…

Develop an application using LangChain

模型、提示和解析器 模型 from langchain.chat_models import ChatOpenAIchat ChatOpenAI(temperature0.0) ChatOpenAI的默认模型为gpt-3.5-turbo 提示模板 from langchain.prompts import ChatPromptTemplatetemplate_string """把由三个反引号分隔的文…

浅析Git子模块

Git 子模块&#xff08;Submodule&#xff09;是 Git 的一个功能&#xff0c;允许将一个 Git 仓库作为另一个 Git 仓库的子目录。这使得可以在一个项目中包含并管理一个或多个独立的项目&#xff08;仓库&#xff09;。子模块保持自己的独立版本控制&#xff0c;使得主项目和子…

网络通信Socket的简单案例

1.客户端代码 import java.io.*; import java.net.Socket;public class GreetingClient {public static void main(String[] args) throws IOException {//准备目的地参数String ip "localhost";int port6006;//建立工人Socket client new Socket(ip, port);//建…

arcgis如何给没有连通的路打交点

1、在打交点的时候需要先有图层&#xff0c;图层的构建流程如下所示 1、找到目录 2、先新建一个文件夹 3、在新建的文件夹下新建一个文件地理数据库 4、在文件地理数据库下&#xff0c;新建一个要素类数据集 5、在要素类数据集下进行数据导入&#xff0c;选择单个导入 6、在要…

Meta Llama 3 .transpose().contiguous().view

Meta Llama 3 .transpose().contiguous().view() flyfish 参考地址 https://pytorch.org/docs/stable/generated/torch.transpose.html transpose美[trnspoʊz] 英[trnspəʊz;trɑːns-;-nz-] v. 使换位 / 转移 / 转换 / 调换n. <数>转置&#xff08;矩&#xff09…

2 程序的灵魂—算法-2.2 简单算法举例-【例 2.1】

【例 2.1】求 12345。 最原始方法&#xff1a; 步骤 1&#xff1a;先求 12&#xff0c;得到结果 2。 步骤 2&#xff1a;将步骤 1 得到的乘积 2 乘以 3&#xff0c;得到结果 6。 步骤 3&#xff1a;将 6 再乘以 4&#xff0c;得 24。 步骤 4&#xff1a;将 24 再乘以 5&#xf…

据报道,FTC 和 DOJ 对微软、OpenAI 和 Nvidia 展开反垄断调查

据《纽约时报》报道&#xff0c;联邦贸易委员会 (FTC) 和司法部 (DOJ) 同意分担调查微软、OpenAI 和 Nvidia 潜在反垄断违规行为的职责。 美国司法部将牵头对英伟达进行调查&#xff0c;而联邦贸易委员会将调查 OpenAI 与其最大投资者微软之间的交易。 喜好儿网 今年 1 月&a…

胶南代理记账,为您提供专业、便捷的会计服务

欢迎来到胶南代理记账服务站&#xff0c;这里我们专注于为企业提供专业的会计服务&#xff0c;无论您是初创企业还是已经在业界有一定规模的企业&#xff0c;我们都将以最专业的态度和最高效的服务为您量身定制合适的记账方案。 我们的目标不仅是帮助您完成财务报告的制作&…

Flink mongo Kafka

Apache Flink 是一个流处理和批处理的开源平台&#xff0c;用于在分布式环境中处理无界和有界数据流。它提供了用于数据处理的数据流 API&#xff08;DataStream API&#xff09;和表 API&#xff08;Table API&#xff09;&#xff0c;并可以与各种外部数据源和存储系统进行交…

Diffusers代码学习: IP-Adapter

从操作的角度来看&#xff0c;IP-Adapter和图生图是很相似的&#xff0c;都是有一个原始的图片&#xff0c;加上提示词&#xff0c;生成目标图片。但它们的底层实现方式是完全不一样的&#xff0c;我们通过源码解读来看一下。以下是ip adapter的实现方式 # 以下代码为程序运行…

51单片机通过键盘输入数值,控制流水灯的方向和速度。

1、功能描述 通过键盘输入数值&#xff0c;控制流水灯的方向和速度。 2、实验原理 键盘输入原理&#xff1a; 键盘通常通过矩阵形式连接到单片机的I/O端口。当用户按下某个按键时&#xff0c;会改变键盘矩阵中对应行和列的电平&#xff0c;单片机通过检测这些变化来确定哪个按…

Python opencv读取深度图,网格化显示深度

效果图&#xff1a; 代码&#xff1a; import cv2 import osimg_path "./outdir/180m_norm_depth.png" depth_img cv2.imread(img_path, cv2.IMREAD_ANYDEPTH) filename os.path.basename(img_path) img_hig, img_wid depth_img.shape # (1080, 1920) print(de…

C# MemoryCache 缓存应用

摘要 缓存是一种非常常见的性能优化技术&#xff0c;在开发过程中经常会用到。.NET提供了内置的内存缓存类 MemoryCache&#xff0c;它可以很方便地存储数据并在后续的请求中快速读取&#xff0c;从而提高应用程序的响应速度。 正文 通过使用 Microsoft.Extensions.Caching.Me…

mqtt-emqx:设置遗嘱消息

【pom.xml】 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.3.12.RELEASE</version> </dependency> <dependency><groupId>org.eclipse…

OpenAI新成果揭秘语言模型神经活动:稀疏自编码器的前沿探索

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

手机自动化测试:6.某团文字的提取

我们要进行的操作重点就是文字的提取&#xff0c;然后循环&#xff0c;提取不是吗&#xff1f; try:# 使用XPath定位带有index属性的FrameLayout元素frame_layout_elements WebDriverWait(driver, timeout, poll_frequency).until(EC.presence_of_all_elements_located((By.X…