MQTT是一种基于发布/订阅模式的消息传输协议,它使用TCP/IP协议进行通信。MQTT的设计原则是轻量级、简单和可靠,适用于各种网络环境和设备。MQTT采用了订阅(Subscribe)和发布(Publish)的模式,客户端可以订阅感兴趣的主题(Topic),同时也可以发布消息到指定的主题。
接入:
一:添加MQTT依赖库 在Android项目的build.gradle文件中添加MQTT依赖库的引用,例如:复制
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
implementation('org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0')
2:添加申请权限
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<uses-permission android:name="android.permission.WAKE_LOCK" /><!--mqtt协议--><service android:name=".data.mqtt.MqttService" /><service android:name="org.eclipse.paho.android.service.MqttService" /> <!--MqttService-->
3:创建Mqttservice继承自service
/*** date:2023/8/16* author:wangsimin* funcation:mqtt协议*/
public class MqttService extends Service {public static final String TAG = "MqttService";private static MqttAndroidClient client;private MqttConnectOptions conOpt;private String serverURI = "tcp://xxx.xxx.xxx:0000"; //服务器地址private String userName = ""; //账号private String passWord = ""; //密码private static String topic = ""; //订阅名private String clientId = Settings.Secure.ANDROID_ID; //客户端IDprivate int qos = 2;@Overridepublic int onStartCommand(Intent intent, int flags, int startId) {init();return super.onStartCommand(intent, flags, startId);}@Nullable@Overridepublic IBinder onBind(Intent intent) {return null;}private void init() {Log.i(TAG, "initMqtt");// 服务器地址(协议+地址+端口号)client = new MqttAndroidClient(App.instance(), serverURI, clientId);conOpt = new MqttConnectOptions();// 清除缓存conOpt.setCleanSession(true);// 设置超时时间,单位:秒conOpt.setConnectionTimeout(10);// 心跳包发送间隔,单位:秒conOpt.setKeepAliveInterval(20);// 用户名conOpt.setUserName(userName);// 密码conOpt.setPassword(passWord.toCharArray());try {// 设置MQTT监听并且接受消息client.setCallback(mqttCallback);} catch (Exception e) {e.printStackTrace();}// last will messageboolean doConnect = true;// 最后try {conOpt.setWill(topic, clientId.getBytes(), qos, false);} catch (Exception e) {e.printStackTrace();doConnect = false;iMqttActionListener.onFailure(null, e);}if (doConnect) {doClientConnection();}}/*** 连接MQTT服务器*/private void doClientConnection() {try {if (!client.isConnected() && isConnectIsNomarl()) {client.connect(conOpt, null, iMqttActionListener);}} catch (MqttException e) {e.printStackTrace();}}// MQTT是否连接成功IMqttActionListener iMqttActionListener = new IMqttActionListener() {@Overridepublic void onSuccess(IMqttToken arg0) {Log.i(TAG, "connect onSuccess " + topic);try {// 订阅myTopic话题,当订阅多条频道,需要遍历逐条订阅,否则有可能订阅失败client.subscribe(topic, qos, null, new IMqttActionListener() {@Overridepublic void onSuccess(IMqttToken asyncActionToken) {Log.i(TAG, "subscribe onSuccess ");}@Overridepublic void onFailure(IMqttToken asyncActionToken, Throwable exception) {exception.printStackTrace();Log.e(TAG, "subscribe onFailure ");}});} catch (MqttException e) {e.printStackTrace();}}@Overridepublic void onFailure(IMqttToken arg0, Throwable arg1) {Log.e(TAG, "connect onFailure ");arg1.printStackTrace();doClientConnection();//连接失败,重连(可关闭服务器进行模拟)}};// MQTT监听并且接受消息MqttCallback mqttCallback = new MqttCallback() {@Overridepublic void messageArrived(String topic, MqttMessage message) {Log.i(TAG, "messageArrived:" + new String(message.getPayload()));//订阅信息,接收的信息messagetry {if (TextUtils.isEmpty(new String(message.getPayload()))) {return;}} catch (Exception e) {e.printStackTrace();}}@Overridepublic void deliveryComplete(IMqttDeliveryToken arg0) {Log.e(TAG, "deliveryComplete");}@Overridepublic void connectionLost(Throwable arg0) {Log.e(TAG, "connectionLost");// 如果是登录状态 失去连接,重连doClientConnection();}};/*** 判断网络是否连接*/private boolean isConnectIsNomarl() {ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE);NetworkInfo info = connectivityManager.getActiveNetworkInfo();return info != null && info.isAvailable();}//发布消息 msgpublic static void publish(String msg) {try {client.publish(topic, msg.getBytes(), 2, false);} catch (MqttException e) {e.printStackTrace();}}/*** 开启服务** @param context*/public static void startService(Context context) {context.startService(new Intent(context, MqttService.class));}public static void stopService(Context context) {context.stopService(new Intent(context, MqttService.class));}@Overridepublic void onDestroy() {try {if (client != null) {client.disconnect(); //服务销毁,断开连接}} catch (MqttException e) {e.printStackTrace();}super.onDestroy();}}
最后调用即可使用
//启动服务MqttService.startService(this);
最后总结下遇到的问题
不可识别的包(32108)
解决:换了serverURI的端口号即可
相关错误码:
1=无效协议版本
2=无效客户机标识
3=代理程序不可用
4=错误的用户名或密码
5=无权连接
6=意外错误
32000=等待来自服务器的响应时超时
32100=已连接客户机
32101=已断开客户机连接
32102=客户机正在断开连接
32103=无法连接至服务器
32104=客户机未连接
32105=指定的 SocketFactory 类型与代理程序 URI 不匹配
32106=SSL 配置错误
32107=不允许通过回调方法断开连接
32108=不可识别的包
32109=已断开连接
32110=已在进行连接
32111=客户机已关闭
32200=持久性已在使用中
32201=令牌已在使用中
32202=正在进行过多的发布