Android开发遇到需要和MQ队列通信,使用StompProtocolAndroid可以实现。
StompProtocolAndroid官网:GitHub - NaikSoftware/StompProtocolAndroid: STOMP protocol via WebSocket for Android
首先app的build.gradle添加依赖
implementation "com.github.NaikSoftware:StompProtocolAndroid:1.6.4"//StompProtocolAndroidimplementation "io.reactivex.rxjava2:rxjava:2.2.5"implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
项目的build.gradle中添加下面内容。如果是新版studio,是在setting.gradle里面配置
allprojects {repositories {jcenter()maven { url "https://jitpack.io" }}
}
整理一个工具类StompClientUtil,直接调用即可使用
调用方式,在onCreate中调用initStompClient()做初始化,
调用addTopicData()添加回调监听,队列返回的数据,都通过这里监听处理。QUEUE_KEY是自己定义的队列名称字符串,可以通过设备id定义每个设备一个队列名称,在callBacke中处理队列返回的信息。
调用sendMessage()方法向队列发送消息,QUEUE_KEY是队列名,第二个参数是发送的内容。
@Overrideprotected void onCreate(@Nullable Bundle savedInstanceState) {super.onCreate(savedInstanceState);initClick();StompClientUtil.getInstance().initStompClient();StompClientUtil.getInstance().addTopicData(this, QUEUE_KEY, new StompClientUtil.TopicCallBack() {@Overridepublic void callBack(StompMessage stompMessage) {Log.i(TAG, stompMessage.getPayload());}});}private void initClick(){binding.btnSend.setOnClickListener(this);}@Overridepublic void onClick(@Nullable View view) {super.onClick(view);if (view.equals(binding.btnSend)){StompClientUtil.getInstance().sendMessage(QUEUE_KEY, "发送内容");}}
其中sendMessage()方法具体如下。其中
String conTopic = "/exchange/exchange_web.../key_...." + topicKey;
可以看出,和MQ通信是通过交换机的方式,"/exchange/exchange_web.../key_...."是MQ指定的发送路径,拼接上自己的QUEUE_KEY,在MQ中会生成一个发送队列。通过这个队列和MQ通信。这种方式生成的独立是自动消息的,一旦失去监听,队列也会自动消失。
/*** 发送数据* @param topicKey 生成的随机数* @param data 发送数据*/public void sendMessage(String topicKey, String data){if (TextUtils.isEmpty(data)){return;}if (stompClient == null) {initStompClient();return;}if (!stompClient.isConnected()) {Log.i(TAG, "sendMessage 发现isConnected false");handler.sendEmptyMessage(0);return;}Log.i(TAG, "真的发送出去啦啦啦XXXXXXXXXXXXXXXXXX");String conTopic = "/exchange/exchange_web.../key_...." + topicKey;stompClient.send(conTopic, data).unsubscribeOn(Schedulers.newThread()).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new CompletableObserver() {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onComplete() {Log.e(TAG, "STOMP echo send successfully");}@Overridepublic void onError(Throwable e) {Log.e(TAG, "STOMP echo send onError" + e.getMessage());}});
addTopicData()方法具体如下。其中
String topic = "/exchange/exchange_transd.../key_...."+conTopic;
"/exchange/exchange_transd.../key_...."是MQ指定了接收数据的路径,拼接上自己的QUEUE_KEY,在MQ中会生成一个接收队列,客户端通过这里接收数据。
/*** @param tempInstance* @param conTopic* @param topicCallBack*/public void addTopicData(Object tempInstance, String conTopic, TopicCallBack topicCallBack) {if (stompClient == null || compositeDisposable == null || instance == null) {return;}String disKey = tempInstance.getClass().getCanonicalName() + conTopic;if (callbackMap.containsKey(disKey)) {
// Log.e(TAG, "该对象已存在相同监听:"+disKey);return;}String topic = "/exchange/exchange_transd.../key_...."+conTopic;callbackMap.put(disKey, topicCallBack);topicMap.put(disKey, topic);Disposable disposable = stompClient.topic(topic, headers).doOnError(throwable -> {Log.e(TAG, "doOnError on subscribe topic=" + topic + throwable);}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(stompMessage -> {Log.e(TAG, "stomp监听消息=" + stompMessage);if (callbackMap != null && callbackMap.size() > 0 && callbackMap.get(disKey) != null) {callbackMap.get(disKey).callBack(stompMessage);}}, throwable -> {Log.e(TAG, "throwable on subscribe topic" + throwable);});compositeDisposable.add(disposable);disposableMap.put(disKey, disposable);}
完整代码如下:
package com.example.yuntest.utils;import android.annotation.SuppressLint;
import android.os.CountDownTimer;
import android.os.Handler;
import android.os.Looper;
import android.os.Message;
import android.text.TextUtils;
import android.util.ArrayMap;
import android.util.Log;import java.util.ArrayList;
import java.util.Map;import io.reactivex.CompletableObserver;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import ua.naiksoftware.stomp.Stomp;
import ua.naiksoftware.stomp.StompClient;
import ua.naiksoftware.stomp.dto.StompCommand;
import ua.naiksoftware.stomp.dto.StompHeader;
import ua.naiksoftware.stomp.dto.StompMessage;public class StompClientUtil {public static final String URL = "ws://IP地址:端口号/ws";//连接地址public static final String MQTT_USERNAME = " "; // mqtt连接用户名public static final String MQTT_PASSWORD = " "; // mqtt连接密码private static final String TAG = "StompClientUtil";private static StompClientUtil instance;public CompositeDisposable compositeDisposable;private StompClient stompClient;private final int reconnectionMaxNum = 900; //重连最大次数 时间约等于一个小时private int reconnectionNum = 0; //重连次数private ArrayList<StompHeader> headers;
// private NetChangeObserver mNetChangeObserver = null;private CountDownTimer timer;private Map<String, Disposable> disposableMap;private Map<String, TopicCallBack> callbackMap;private Map<String, String> topicMap;public static StompClientUtil getInstance() {if (instance == null) {synchronized (StompClientUtil.class) {if (instance == null) {instance = new StompClientUtil();}}}return instance;}@SuppressLint("CheckResult")public void initStompClient() {if (stompClient != null) {return;}disposableMap = new ArrayMap<>();callbackMap = new ArrayMap<>();topicMap = new ArrayMap<>();initComposite();stompClient = Stomp.over(Stomp.ConnectionProvider.OKHTTP, URL);stompClient.lifecycle().subscribe(lifecycleEvent -> {switch (lifecycleEvent.getType()) {case OPENED:reconnectionNum = 0;Log.e(TAG, "debug stomp 已连接");break;case ERROR:// javax.net.ssl.SSLException: Read error: ssl=0x7a879fa788 需要处理Log.e(TAG, " debug stomp 连接错误: " + lifecycleEvent.getException());if (reconnectionNum < reconnectionMaxNum) {handler.sendEmptyMessage(0);}break;case CLOSED:Log.e(TAG, " debug stomp closed: ");break;case FAILED_SERVER_HEARTBEAT:Log.e(TAG, " Stomp fail server heartbeat");break;}}, throwable -> Log.e(TAG, " Stomp connect Throwable:" + Log.getStackTraceString(throwable)));stompClient.withClientHeartbeat(1000 * 10).withServerHeartbeat(1000 * 10);connect();
// getNetWork();}// private void getNetWork() {
// // 网络改变的一个回掉类
// mNetChangeObserver = new NetChangeObserver() {
// @Override
// public void onNetConnected(com.caption.netmonitorlibrary.netStateLib.NetUtils.NetType type) {
// if (type == com.caption.netmonitorlibrary.netStateLib.NetUtils.NetType.NONE) {
// onNetworkDisConnected();
// return;
// }
// onNetworkConnected(type);
// }
//
// @Override
// public void onNetDisConnect() {
// onNetworkDisConnected();
// }
// };
// NetStateReceiver.registerObserver(mNetChangeObserver);
// }
//
// private void onNetworkConnected(NetUtils.NetType type) { //辅助一层,确保重连
// if (!stompClient.isConnected() && timer == null && reconnectionNum < reconnectionMaxNum) {
// handler.sendEmptyMessage(0);
// }
// }
//
// private void onNetworkDisConnected() {
//
// }public StompClient getStompClient() {return stompClient;}@SuppressLint("HandlerLeak")Handler handler = new Handler(Looper.getMainLooper()) {@Overridepublic void dispatchMessage(Message msg) {if (msg.what == 0) { //重连reConnect();} else if (msg.what == 1) { //重新添加监听Log.e(TAG, "重连中");reconnectionNum++;connect();reConnectTopic();}}};public void stompDiyReConnect() {if (stompClient == null) {initStompClient();}handler.sendEmptyMessage(0);}private void reConnect() {if (timer != null) {return;}timer = new CountDownTimer(4 * 1000, 1000) {@Overridepublic void onTick(long l) {}@Overridepublic void onFinish() {timer = null;stompClient.disconnectCompletable().subscribe(new CompletableObserver() {@Overridepublic void onSubscribe(Disposable d) {Log.e(TAG, "Disposable code:" + d.hashCode());}@Overridepublic void onComplete() {Log.e(TAG, "断开连接,准备重连");handler.sendEmptyMessage(1);}@Overridepublic void onError(Throwable e) {Log.e(TAG, "Disconnect error:" + e);}});}};timer.start();}private void connect() {if (stompClient == null) {initStompClient();}headers = new ArrayList<>();headers.add(new StompHeader("name", MQTT_USERNAME));headers.add(new StompHeader("password", MQTT_PASSWORD));stompClient.connect(headers);}/*** @param tempInstance* @param conTopic* @param topicCallBack*/public void addTopicData(Object tempInstance, String conTopic, TopicCallBack topicCallBack) {if (stompClient == null || compositeDisposable == null || instance == null) {return;}String disKey = tempInstance.getClass().getCanonicalName() + conTopic;if (callbackMap.containsKey(disKey)) {
// Log.e(TAG, "该对象已存在相同监听:"+disKey);return;}String topic = "/exchange/exchange_transd.../key_...."+conTopic;callbackMap.put(disKey, topicCallBack);topicMap.put(disKey, topic);Disposable disposable = stompClient.topic(topic, headers).doOnError(throwable -> {Log.e(TAG, "doOnError on subscribe topic=" + topic + throwable);}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(stompMessage -> {Log.e(TAG, "stomp监听消息=" + stompMessage);if (callbackMap != null && callbackMap.size() > 0 && callbackMap.get(disKey) != null) {callbackMap.get(disKey).callBack(stompMessage);}}, throwable -> {Log.e(TAG, "throwable on subscribe topic" + throwable);});compositeDisposable.add(disposable);disposableMap.put(disKey, disposable);}private void reConnectTopic() {if (callbackMap == null || stompClient == null || compositeDisposable == null|| callbackMap.size() < 1) {return;}Log.e(TAG, "重新监听");for (Map.Entry<String, TopicCallBack> entry : callbackMap.entrySet()) {String mapKey = entry.getKey();TopicCallBack mapValue = entry.getValue();Log.e(TAG, "key:" + mapKey);compositeDisposable.remove(disposableMap.get(mapKey));Disposable disposable = stompClient.topic(topicMap.get(mapKey), headers).doOnError(throwable -> {Log.e(TAG, "doOnError on subscribe topic=" + topicMap.get(mapKey) + throwable);handler.sendEmptyMessage(0);}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(stompMessage -> {Log.e(TAG, "stomp监听消息=" + stompMessage.getPayload());if (mapValue != null) {mapValue.callBack(stompMessage);}}, throwable -> {Log.e(TAG, "throwable on subscribe topic" + throwable);});compositeDisposable.add(disposable);disposableMap.put(mapKey, disposable);}}public void removeTopic(Object tempInstance, String conTopic) {if (compositeDisposable == null || stompClient == null) {return;}if (disposableMap == null || disposableMap.size() < 1 || callbackMap == null) {return;}String disKey = tempInstance.getClass().getCanonicalName() + conTopic;if (!disposableMap.containsKey(disKey)) {Log.e(TAG, "不存在对应监听");return;}topicMap.remove(disKey);callbackMap.remove(disKey);compositeDisposable.remove(disposableMap.get(disKey));}/*** 是否连接中* @return*/public boolean isConnected(){return stompClient.isConnected();}/*** 发送数据* @param topicKey 生成的随机数* @param data 发送数据*/public void sendMessage(String topicKey, String data){if (TextUtils.isEmpty(data)){return;}if (stompClient == null) {initStompClient();return;}if (!stompClient.isConnected()) {Log.i(TAG, "sendMessage 发现isConnected false");handler.sendEmptyMessage(0);return;}Log.i(TAG, "真的发送出去啦啦啦XXXXXXXXXXXXXXXXXX");String conTopic = "/exchange/exchange_web.../key_...." + topicKey;stompClient.send(conTopic, data).unsubscribeOn(Schedulers.newThread()).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new CompletableObserver() {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onComplete() {Log.e(TAG, "STOMP echo send successfully");}@Overridepublic void onError(Throwable e) {Log.e(TAG, "STOMP echo send onError" + e.getMessage());}});}/*** 示例:/topic/bid_list*/public void sendMessage(String body, SendMessageCallBack sendMessageCallBack) {if (stompClient == null) {initStompClient();return;}if (!stompClient.isConnected()) {handler.sendEmptyMessage(0);return;}stompClient.send(new StompMessage(StompCommand.SEND, headers, body)).unsubscribeOn(Schedulers.newThread()).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new CompletableObserver() {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onComplete() {if (sendMessageCallBack != null) {sendMessageCallBack.onSuccess();}Log.e(TAG, "STOMP echo send successfully");}@Overridepublic void onError(Throwable e) {if (sendMessageCallBack != null) {sendMessageCallBack.onError();}Log.e(TAG, "STOMP echo send onError");}});}public interface TopicCallBack {void callBack(StompMessage stompMessage);}public interface SendMessageCallBack {void onSuccess();void onError();}private void initComposite() {if (compositeDisposable != null) {compositeDisposable.dispose();}compositeDisposable = new CompositeDisposable();}//取消订阅public void unSubcribe() {if (compositeDisposable != null) {compositeDisposable.dispose();}}public void stopStomp() {if (stompClient != null) {stompClient.disconnect();}if (timer != null) {timer.cancel();}if (disposableMap != null) {disposableMap.clear();disposableMap = null;}// NetStateReceiver.removeRegisterObserver(mNetChangeObserver);unSubcribe();stompClient = null;timer = null;if (handler != null) {handler.removeMessages(0);}}}