一、引包
1.1.模块的build.gradle
//mqtt框架implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'implementation 'org.bouncycastle:bcpkix-jdk15on:1.59'
1.2 旧版AndroidStudio开发工具在项目的guild.gradle引入
maven {url "https://repo.eclipse.org/content/repositories/paho-snapshots/"}
1.3 新版则在setting.gradle引入
maven {url "https://repo.eclipse.org/content/repositories/paho-snapshots/"}
二、工具类封装
/*** mqtt* 服务*/public class XyMqttService extends Service {public final static String TAG = "数据处理";public static MqttAndroidClient mqttAndroidClient;private static MqttConnectOptions mMqttConnectOptions;//wss://z66811d5.ala.cn-hangzhou.emqxsl.cn:8084/mqttpublic static String HOST = "tcp://www.....:1883";//服务器地址(协议+地址+端口号)
// public static String HOST = "tcp://.........n:8883";//服务器地址(协议+地址+端口号)public String USERNAME = "xiaoya";//用户名public String PASSWORD = "xiaoya";//密码public static String PUBLISH_TOPIC = "xiaoya/video/pull/1";//发布主题public static String RESPONSE_TOPIC = "xiaoya/video/1111";//响应主题public String CLIENTID = "safffadasaafqedq2";//设备唯一标识/*** 订阅主题:* 小雅视频拉取:xiaoya/video/pull/+* 小雅视频通话:xiaoya/video/chat/+*/@Overridepublic int onStartCommand(Intent intent, int flags, int startId) {init();return START_NOT_STICKY;//非粘性的 service强制杀死后,不会尝试重新启动service}@Nullable@Overridepublic IBinder onBind(Intent intent) {return null;}/*** 开启服务*/public static void startService(Context mContext) {mContext.startService(new Intent(mContext, XyMqttService.class));}/*** 发布 (模拟其他客户端发布消息)** @param message 消息*/public static void publish(String message) {String topic = PUBLISH_TOPIC;Integer qos = 1;Boolean retained = false;try {//参数分别为:主题、消息的字节数组、服务质量、是否在服务器保留断开连接后的最后一条消息mqttAndroidClient.publish(topic, message.getBytes(), qos.intValue(), retained.booleanValue());} catch (MqttException e) {e.printStackTrace();}}/*** 响应 (收到其他客户端的消息后,响应给对方告知消息已到达或者消息有问题等)** @param message 消息*/public static void response(String message) {String topic = RESPONSE_TOPIC;Integer qos = 1;Boolean retained = false;try {//参数分别为:主题、消息的字节数组、服务质量、是否在服务器保留断开连接后的最后一条消息mqttAndroidClient.publish(topic, message.getBytes(), qos.intValue(), retained.booleanValue());} catch (MqttException e) {e.printStackTrace();}}/*** 初始化*/private void init() {String serverURI = HOST; //服务器地址(协议+地址+端口号)Logger.d("==初始化MQ:" + serverURI);if (mqttAndroidClient == null) {mqttAndroidClient = new MqttAndroidClient(this, serverURI, CLIENTID);mqttAndroidClient.setCallback(mqttCallback); //设置监听订阅消息的回调}if (mMqttConnectOptions == null) {mMqttConnectOptions = new MqttConnectOptions();try {
// InputStream caCrtFileI = getResources().openRawResource(R.raw.ca);
// mMqttConnectOptions.setSocketFactory(getSingleSocketFactory(caCrtFileI));mMqttConnectOptions.setCleanSession(true); //设置是否清除缓存mMqttConnectOptions.setConnectionTimeout(10); //设置超时时间,单位:秒mMqttConnectOptions.setKeepAliveInterval(20); //设置心跳包发送间隔,单位:秒mMqttConnectOptions.setUserName(USERNAME); //设置用户名mMqttConnectOptions.setPassword(PASSWORD.toCharArray()); //设置密码} catch (Exception e) {e.printStackTrace();}}// last will messageboolean doConnect = true;String message = "{\"status\":\"" + CLIENTID + "\"}";String topic = PUBLISH_TOPIC;Integer qos = 1;Boolean retained = true;if ((!message.equals("")) || (!topic.equals(""))) {// 最后的遗嘱try {mMqttConnectOptions.setWill(topic, message.getBytes(), qos.intValue(), retained.booleanValue());} catch (Exception e) {Logger.i("==Exception Occured==");doConnect = false;iMqttActionListener.onFailure(null, e);}}if (doConnect) {doClientConnection();}}/*** 连接MQTT服务器*/private static void doClientConnection() {try {if (!mqttAndroidClient.isConnected() && isConnectIsNomarl()) {Logger.d("====连接MQTT服务器===" + HOST);mqttAndroidClient.connect(mMqttConnectOptions, null, iMqttActionListener);}} catch (Exception e) {e.printStackTrace();}}/*** 判断网络是否连接*/private static boolean isConnectIsNomarl() {ConnectivityManager connectivityManager = (ConnectivityManager) XiaoYaApp.getContext().getSystemService(Context.CONNECTIVITY_SERVICE);NetworkInfo info = connectivityManager.getActiveNetworkInfo();if (info != null && info.isAvailable()) {String name = info.getTypeName();Logger.d("===当前网络名称:" + name);return true;} else {Logger.d("===没有可用网络===");/*没有可用网络的时候,延迟3秒再尝试重连*/new Handler().postDelayed(new Runnable() {@Overridepublic void run() {Logger.d("===没有可用网络doClientConnection===");doClientConnection();}}, 3000);return false;}}//MQTT是否连接成功的监听private static IMqttActionListener iMqttActionListener = new IMqttActionListener() {@Overridepublic void onSuccess(IMqttToken arg0) {Logger.d("==mqtt连接成功 " + HOST);try {if (mqttAndroidClient != null) {mqttAndroidClient.subscribe(PUBLISH_TOPIC, 1);//订阅主题,参数:主题、服务质量}} catch (Exception e) {e.printStackTrace();}}@Overridepublic void onFailure(IMqttToken arg0, Throwable arg1) {arg1.printStackTrace();Logger.d("==mqtt连接失败 ==" + arg1);doClientConnection();//连接失败,重连(可关闭服务器进行模拟)}};//订阅主题的回调private MqttCallback mqttCallback = new MqttCallback() {@Overridepublic void messageArrived(String topic, MqttMessage msgStr) throws Exception {try {String enCodeMsg = new String(msgStr.getPayload());Logger.d("==收到消息: " + enCodeMsg);if (enCodeMsg.contains("请求视频推流")) {initLiveCamera();} else if (enCodeMsg.contains("退出视频推流")) {deStroyLive();}//收到消息,这里弹出Toast表示。如果需要更新UI,可以使用广播或者EventBus进行发送//收到其他客户端的消息后,响应给对方告知消息已到达或者消息有问题等
// response("message arrived");} catch (Exception e) {e.printStackTrace();}}@Overridepublic void deliveryComplete(IMqttDeliveryToken arg0) {}@Overridepublic void connectionLost(Throwable arg0) {Logger.d("==连接断开 ");
// doClientConnection();//连接断开,重连}};public static void disconnect(Context context) {try {if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) {mqttAndroidClient.unsubscribe(PUBLISH_TOPIC);mqttAndroidClient.unregisterResources();mqttAndroidClient.disconnect(0); //断开连接mqttAndroidClient = null;context.stopService(new Intent(context, XyMqttService.class));ZegoExpressEngine.destroyEngine(new IZegoDestroyCompletionCallback() {@Overridepublic void onDestroyCompletion() {//销毁成功}});}} catch (Exception e) {e.printStackTrace();}}/*** 单向* 认证*/public static SSLSocketFactory getSingleSocketFactory(InputStream caCrtFileInputStream) throws Exception {Security.addProvider(new BouncyCastleProvider());X509Certificate caCert = null;BufferedInputStream bis = new BufferedInputStream(caCrtFileInputStream);CertificateFactory cf = CertificateFactory.getInstance("X.509");while (bis.available() > 0) {caCert = (X509Certificate) cf.generateCertificate(bis);}KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());caKs.load(null, null);caKs.setCertificateEntry("cert-certificate", caCert);TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());tmf.init(caKs);SSLContext sslContext = SSLContext.getInstance("TLSv1.2");sslContext.init(null, tmf.getTrustManagers(), null);return sslContext.getSocketFactory();}}