使用StompProtocolAndroid连接MQ通信

Android开发遇到需要和MQ队列通信,使用StompProtocolAndroid可以实现。

StompProtocolAndroid官网:GitHub - NaikSoftware/StompProtocolAndroid: STOMP protocol via WebSocket for Android

首先app的build.gradle添加依赖

    implementation "com.github.NaikSoftware:StompProtocolAndroid:1.6.4"//StompProtocolAndroidimplementation "io.reactivex.rxjava2:rxjava:2.2.5"implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'

项目的build.gradle中添加下面内容。如果是新版studio,是在setting.gradle里面配置

allprojects {repositories {jcenter()maven { url "https://jitpack.io" }}
}

整理一个工具类StompClientUtil,直接调用即可使用

调用方式,在onCreate中调用initStompClient()做初始化,

调用addTopicData()添加回调监听,队列返回的数据,都通过这里监听处理。QUEUE_KEY是自己定义的队列名称字符串,可以通过设备id定义每个设备一个队列名称,在callBacke中处理队列返回的信息。

调用sendMessage()方法向队列发送消息,QUEUE_KEY是队列名,第二个参数是发送的内容。

@Overrideprotected void onCreate(@Nullable Bundle savedInstanceState) {super.onCreate(savedInstanceState);initClick();StompClientUtil.getInstance().initStompClient();StompClientUtil.getInstance().addTopicData(this, QUEUE_KEY, new StompClientUtil.TopicCallBack() {@Overridepublic void callBack(StompMessage stompMessage) {Log.i(TAG, stompMessage.getPayload());}});}private void initClick(){binding.btnSend.setOnClickListener(this);}@Overridepublic void onClick(@Nullable View view) {super.onClick(view);if (view.equals(binding.btnSend)){StompClientUtil.getInstance().sendMessage(QUEUE_KEY, "发送内容");}}

其中sendMessage()方法具体如下。其中

String conTopic = "/exchange/exchange_web.../key_...." + topicKey;

可以看出,和MQ通信是通过交换机的方式,"/exchange/exchange_web.../key_...."是MQ指定的发送路径,拼接上自己的QUEUE_KEY,在MQ中会生成一个发送队列。通过这个队列和MQ通信。这种方式生成的独立是自动消息的,一旦失去监听,队列也会自动消失。

/*** 发送数据* @param topicKey 生成的随机数* @param data 发送数据*/public void sendMessage(String topicKey, String data){if (TextUtils.isEmpty(data)){return;}if (stompClient == null) {initStompClient();return;}if (!stompClient.isConnected()) {Log.i(TAG, "sendMessage 发现isConnected false");handler.sendEmptyMessage(0);return;}Log.i(TAG, "真的发送出去啦啦啦XXXXXXXXXXXXXXXXXX");String conTopic = "/exchange/exchange_web.../key_...." + topicKey;stompClient.send(conTopic, data).unsubscribeOn(Schedulers.newThread()).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new CompletableObserver() {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onComplete() {Log.e(TAG, "STOMP echo send successfully");}@Overridepublic void onError(Throwable e) {Log.e(TAG, "STOMP echo send onError" + e.getMessage());}});

addTopicData()方法具体如下。其中

String topic = "/exchange/exchange_transd.../key_...."+conTopic;

"/exchange/exchange_transd.../key_...."是MQ指定了接收数据的路径,拼接上自己的QUEUE_KEY,在MQ中会生成一个接收队列,客户端通过这里接收数据。

/*** @param tempInstance* @param conTopic* @param topicCallBack*/public void addTopicData(Object tempInstance, String conTopic, TopicCallBack topicCallBack) {if (stompClient == null || compositeDisposable == null || instance == null) {return;}String disKey = tempInstance.getClass().getCanonicalName() + conTopic;if (callbackMap.containsKey(disKey)) {
//            Log.e(TAG, "该对象已存在相同监听:"+disKey);return;}String topic = "/exchange/exchange_transd.../key_...."+conTopic;callbackMap.put(disKey, topicCallBack);topicMap.put(disKey, topic);Disposable disposable = stompClient.topic(topic, headers).doOnError(throwable -> {Log.e(TAG, "doOnError on subscribe topic=" + topic + throwable);}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(stompMessage -> {Log.e(TAG, "stomp监听消息=" + stompMessage);if (callbackMap != null && callbackMap.size() > 0 && callbackMap.get(disKey) != null) {callbackMap.get(disKey).callBack(stompMessage);}}, throwable -> {Log.e(TAG, "throwable on subscribe topic" + throwable);});compositeDisposable.add(disposable);disposableMap.put(disKey, disposable);}

完整代码如下:

package com.example.yuntest.utils;import android.annotation.SuppressLint;
import android.os.CountDownTimer;
import android.os.Handler;
import android.os.Looper;
import android.os.Message;
import android.text.TextUtils;
import android.util.ArrayMap;
import android.util.Log;import java.util.ArrayList;
import java.util.Map;import io.reactivex.CompletableObserver;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import ua.naiksoftware.stomp.Stomp;
import ua.naiksoftware.stomp.StompClient;
import ua.naiksoftware.stomp.dto.StompCommand;
import ua.naiksoftware.stomp.dto.StompHeader;
import ua.naiksoftware.stomp.dto.StompMessage;public class StompClientUtil {public static final String URL = "ws://IP地址:端口号/ws";//连接地址public static final String MQTT_USERNAME = " "; // mqtt连接用户名public static final String MQTT_PASSWORD = " "; // mqtt连接密码private static final String TAG = "StompClientUtil";private static StompClientUtil instance;public CompositeDisposable compositeDisposable;private StompClient stompClient;private final int reconnectionMaxNum = 900; //重连最大次数 时间约等于一个小时private int reconnectionNum = 0; //重连次数private ArrayList<StompHeader> headers;
//    private NetChangeObserver mNetChangeObserver = null;private CountDownTimer timer;private Map<String, Disposable> disposableMap;private Map<String, TopicCallBack> callbackMap;private Map<String, String> topicMap;public static StompClientUtil getInstance() {if (instance == null) {synchronized (StompClientUtil.class) {if (instance == null) {instance = new StompClientUtil();}}}return instance;}@SuppressLint("CheckResult")public void initStompClient() {if (stompClient != null) {return;}disposableMap = new ArrayMap<>();callbackMap = new ArrayMap<>();topicMap = new ArrayMap<>();initComposite();stompClient = Stomp.over(Stomp.ConnectionProvider.OKHTTP, URL);stompClient.lifecycle().subscribe(lifecycleEvent -> {switch (lifecycleEvent.getType()) {case OPENED:reconnectionNum = 0;Log.e(TAG, "debug stomp 已连接");break;case ERROR:// javax.net.ssl.SSLException: Read error: ssl=0x7a879fa788 需要处理Log.e(TAG, " debug stomp 连接错误: " + lifecycleEvent.getException());if (reconnectionNum < reconnectionMaxNum) {handler.sendEmptyMessage(0);}break;case CLOSED:Log.e(TAG, " debug stomp closed: ");break;case FAILED_SERVER_HEARTBEAT:Log.e(TAG, " Stomp fail server heartbeat");break;}}, throwable -> Log.e(TAG, " Stomp connect Throwable:" + Log.getStackTraceString(throwable)));stompClient.withClientHeartbeat(1000 * 10).withServerHeartbeat(1000 * 10);connect();
//        getNetWork();}//    private void getNetWork() {
//        // 网络改变的一个回掉类
//        mNetChangeObserver = new NetChangeObserver() {
//            @Override
//            public void onNetConnected(com.caption.netmonitorlibrary.netStateLib.NetUtils.NetType type) {
//                if (type == com.caption.netmonitorlibrary.netStateLib.NetUtils.NetType.NONE) {
//                    onNetworkDisConnected();
//                    return;
//                }
//                onNetworkConnected(type);
//            }
//
//            @Override
//            public void onNetDisConnect() {
//                onNetworkDisConnected();
//            }
//        };
//        NetStateReceiver.registerObserver(mNetChangeObserver);
//    }
//
//    private void onNetworkConnected(NetUtils.NetType type) { //辅助一层,确保重连
//        if (!stompClient.isConnected() && timer == null && reconnectionNum < reconnectionMaxNum) {
//            handler.sendEmptyMessage(0);
//        }
//    }
//
//    private void onNetworkDisConnected() {
//
//    }public StompClient getStompClient() {return stompClient;}@SuppressLint("HandlerLeak")Handler handler = new Handler(Looper.getMainLooper()) {@Overridepublic void dispatchMessage(Message msg) {if (msg.what == 0) { //重连reConnect();} else if (msg.what == 1) { //重新添加监听Log.e(TAG, "重连中");reconnectionNum++;connect();reConnectTopic();}}};public void stompDiyReConnect() {if (stompClient == null) {initStompClient();}handler.sendEmptyMessage(0);}private void reConnect() {if (timer != null) {return;}timer = new CountDownTimer(4 * 1000, 1000) {@Overridepublic void onTick(long l) {}@Overridepublic void onFinish() {timer = null;stompClient.disconnectCompletable().subscribe(new CompletableObserver() {@Overridepublic void onSubscribe(Disposable d) {Log.e(TAG, "Disposable code:" + d.hashCode());}@Overridepublic void onComplete() {Log.e(TAG, "断开连接,准备重连");handler.sendEmptyMessage(1);}@Overridepublic void onError(Throwable e) {Log.e(TAG, "Disconnect error:" + e);}});}};timer.start();}private void connect() {if (stompClient == null) {initStompClient();}headers = new ArrayList<>();headers.add(new StompHeader("name", MQTT_USERNAME));headers.add(new StompHeader("password", MQTT_PASSWORD));stompClient.connect(headers);}/*** @param tempInstance* @param conTopic* @param topicCallBack*/public void addTopicData(Object tempInstance, String conTopic, TopicCallBack topicCallBack) {if (stompClient == null || compositeDisposable == null || instance == null) {return;}String disKey = tempInstance.getClass().getCanonicalName() + conTopic;if (callbackMap.containsKey(disKey)) {
//            Log.e(TAG, "该对象已存在相同监听:"+disKey);return;}String topic = "/exchange/exchange_transd.../key_...."+conTopic;callbackMap.put(disKey, topicCallBack);topicMap.put(disKey, topic);Disposable disposable = stompClient.topic(topic, headers).doOnError(throwable -> {Log.e(TAG, "doOnError on subscribe topic=" + topic + throwable);}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(stompMessage -> {Log.e(TAG, "stomp监听消息=" + stompMessage);if (callbackMap != null && callbackMap.size() > 0 && callbackMap.get(disKey) != null) {callbackMap.get(disKey).callBack(stompMessage);}}, throwable -> {Log.e(TAG, "throwable on subscribe topic" + throwable);});compositeDisposable.add(disposable);disposableMap.put(disKey, disposable);}private void reConnectTopic() {if (callbackMap == null || stompClient == null || compositeDisposable == null|| callbackMap.size() < 1) {return;}Log.e(TAG, "重新监听");for (Map.Entry<String, TopicCallBack> entry : callbackMap.entrySet()) {String mapKey = entry.getKey();TopicCallBack mapValue = entry.getValue();Log.e(TAG, "key:" + mapKey);compositeDisposable.remove(disposableMap.get(mapKey));Disposable disposable = stompClient.topic(topicMap.get(mapKey), headers).doOnError(throwable -> {Log.e(TAG, "doOnError on subscribe topic=" + topicMap.get(mapKey) + throwable);handler.sendEmptyMessage(0);}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(stompMessage -> {Log.e(TAG, "stomp监听消息=" + stompMessage.getPayload());if (mapValue != null) {mapValue.callBack(stompMessage);}}, throwable -> {Log.e(TAG, "throwable on subscribe topic" + throwable);});compositeDisposable.add(disposable);disposableMap.put(mapKey, disposable);}}public void removeTopic(Object tempInstance, String conTopic) {if (compositeDisposable == null || stompClient == null) {return;}if (disposableMap == null || disposableMap.size() < 1 || callbackMap == null) {return;}String disKey = tempInstance.getClass().getCanonicalName() + conTopic;if (!disposableMap.containsKey(disKey)) {Log.e(TAG, "不存在对应监听");return;}topicMap.remove(disKey);callbackMap.remove(disKey);compositeDisposable.remove(disposableMap.get(disKey));}/*** 是否连接中* @return*/public boolean isConnected(){return stompClient.isConnected();}/*** 发送数据* @param topicKey 生成的随机数* @param data 发送数据*/public void sendMessage(String topicKey, String data){if (TextUtils.isEmpty(data)){return;}if (stompClient == null) {initStompClient();return;}if (!stompClient.isConnected()) {Log.i(TAG, "sendMessage 发现isConnected false");handler.sendEmptyMessage(0);return;}Log.i(TAG, "真的发送出去啦啦啦XXXXXXXXXXXXXXXXXX");String conTopic = "/exchange/exchange_web.../key_...." + topicKey;stompClient.send(conTopic, data).unsubscribeOn(Schedulers.newThread()).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new CompletableObserver() {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onComplete() {Log.e(TAG, "STOMP echo send successfully");}@Overridepublic void onError(Throwable e) {Log.e(TAG, "STOMP echo send onError" + e.getMessage());}});}/*** 示例:/topic/bid_list*/public void sendMessage(String body, SendMessageCallBack sendMessageCallBack) {if (stompClient == null) {initStompClient();return;}if (!stompClient.isConnected()) {handler.sendEmptyMessage(0);return;}stompClient.send(new StompMessage(StompCommand.SEND, headers, body)).unsubscribeOn(Schedulers.newThread()).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new CompletableObserver() {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onComplete() {if (sendMessageCallBack != null) {sendMessageCallBack.onSuccess();}Log.e(TAG, "STOMP echo send successfully");}@Overridepublic void onError(Throwable e) {if (sendMessageCallBack != null) {sendMessageCallBack.onError();}Log.e(TAG, "STOMP echo send onError");}});}public interface TopicCallBack {void callBack(StompMessage stompMessage);}public interface SendMessageCallBack {void onSuccess();void onError();}private void initComposite() {if (compositeDisposable != null) {compositeDisposable.dispose();}compositeDisposable = new CompositeDisposable();}//取消订阅public void unSubcribe() {if (compositeDisposable != null) {compositeDisposable.dispose();}}public void stopStomp() {if (stompClient != null) {stompClient.disconnect();}if (timer != null) {timer.cancel();}if (disposableMap != null) {disposableMap.clear();disposableMap = null;}//        NetStateReceiver.removeRegisterObserver(mNetChangeObserver);unSubcribe();stompClient = null;timer = null;if (handler != null) {handler.removeMessages(0);}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/126325.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

DHorse改用fabric8的SDK与k8s集群交互

现状 在dhorse 1.4.0版本之前&#xff0c;一直使用k8s官方提供的sdk与k8s集群交互&#xff0c;官方sdk的Maven坐标如下&#xff1a; <dependency><groupId>io.kubernetes</groupId><artifactId>client-java</artifactId><version>18.0.0…

layui form表单 label和input宽度

情况是这样的&#xff0c;表单里有多个输入框&#xff0c;只有个别label 是长的&#xff0c;我就想调整一下个别长的&#xff0c;其它不变 <div class"layui-form-item"><label class"layui-form-label">是否分标分量:</label><div …

医学AI智能导诊系统源码

医院智能导诊系统是一款基于人工智能和大数据技术开发的医疗辅助软件&#xff0c;旨在为患者提供更加便捷、精准的医疗服务。 一、什么是智能导诊系统&#xff1f; 智能导诊系统是一种基于人工智能和大数据技术开发的医疗辅助软件&#xff0c;它能够通过对患者的症状、病史等信…

C# 通过Costura.Fody把DLL合并到exe程序中

打包独立的exe程序有多种方法&#xff0c;这里只说Costura.Fody。 我们用VS发布应用程序可以借助Costura.Fody直接打包成一个独立的exe程序&#xff0c;但是一些非托管的做了几次都没打进去&#xff0c;最后成功了&#xff0c;这里记录一下。 首先安装Costura.Fody 或者可以通…

UDP 协议

UDP协议特点&#xff1a; 无连接&#xff1a;只需要知道对方的IP和端口就可以向对方发送数据。 不可靠&#xff1a;没有确认应答机制&#xff0c;没有重传机制。 面向数据报&#xff1a;每调用一次sendto() &#xff0c;就直接将这些数据交给网络层了&#xff0c;不能控制数据读…

CSP-S 2023 T1密码锁 T2消消乐

T1 密码锁 对于一种状态&#xff0c;其可能产生的其他状态共有两种情况&#xff0c;一种情况是只转一个拨圈&#xff0c;这样产生的密码共有 5 ∗ 9 45 5*945 5∗945种&#xff0c;另一种情况是转相邻的两个拨圈&#xff0c;这样产生的密码共有 4 ∗ 9 36 4*936 4∗936种&am…

传奇引擎启动后没有正常运行如何解决

GOM引擎启动后M2空白进不去游戏 当我们版本架设好之后发现怎么都进不去游戏&#xff0c;登陆器也重新生成了还是进不去。这个错误是因为版本中的一些插件错误造成的&#xff0c;那么我们就找到插件的文件&#xff1a;D:\MirServer\Mir200 PlugList.txt打开后可以看到里面有.DL…

Collectors.toMap报错:空指针 key重复

Java 8中的stream在项目开发中被同学们用的风生水起&#xff0c;当然大家也踩了不少坑。下面我就来说说Collections.toMap在项目使用中踩的坑&#xff0c;避免大家重复被坑。 一.介绍Collectors.toMap Collectors.toMap 是 Java 8 中的一个收集器&#xff0c;它可以将流中的元…

C# OpenCvSharp DNN 部署L2CS-Net人脸朝向估计

效果 项目 代码 using OpenCvSharp; using OpenCvSharp.Dnn; using System; using System.Collections.Generic; using System.Drawing; using System.Drawing.Drawing2D; using System.Linq; using System.Text; using System.Windows.Forms;namespace OpenCvSharp_DNN_Demo …

hadoop hdfs的API调用,在mall商城代码中添加api的调用

在网上下载了现成的商城代码的源码 本次旨在熟悉hdfs的api调用&#xff0c;不关注前后端代码的编写&#xff0c;所以直接下载现成的代码&#xff0c;代码下载地址。我下载的是前后端在一起的代码&#xff0c;这样测试起来方便 GitHub - newbee-ltd/newbee-mall: &#x1f525; …

Seata入门系列【18】Seata集成Mybatis-Plus多数据源

1 前言 在使用单个服务&#xff0c;多数据源时&#xff0c;也存在分布式事务问题。 当单体系统需要访问多个数据库&#xff08;实例&#xff09;时就会产生分布式事务。 比如&#xff1a;用户信 息和订单信息分别在两个MySQL实例存储&#xff0c;用户管理系统删除用户信息&am…

驱动开发11-2 编写SPI驱动程序-点亮数码管

驱动程序 #include <linux/init.h> #include <linux/module.h> #include <linux/spi/spi.h>int m74hc595_probe(struct spi_device *spi) {printk("%s:%d\n",__FILE__,__LINE__);char buf[]{0XF,0X6D};spi_write(spi,buf,sizeof(buf));return 0; …

【SpringMVC篇】5种类型参数传递json数据传参

&#x1f38a;专栏【SpringMVC】 &#x1f354;喜欢的诗句&#xff1a;天行健&#xff0c;君子以自强不息。 &#x1f386;音乐分享【如愿】 &#x1f384;欢迎并且感谢大家指出小吉的问题&#x1f970; 文章目录 &#x1f33a;普通参数&#x1f33a;POJO参数&#x1f33a;嵌套…

2024年申报国自然项目基金撰写及技巧:基本要求和标准、项目撰写的方法和技巧、申请流程

随着社会经济发展和科技进步&#xff0c;基金项目对创新性的要求越来越高。申请人需要提出独特且有前瞻性的研究问题&#xff0c;具备突破性的科学思路和方法。因此&#xff0c;基金项目申请往往需要进行跨学科的技术融合。申请人需要与不同领域结合&#xff0c;形成多学科交叉…

Spring Boot 解决跨域问题的 5种方案

跨域问题本质是浏览器的一种保护机制&#xff0c;它的初衷是为了保证用户的安全&#xff0c;防止恶意网站窃取数据。 一、跨域三种情况 在请求时&#xff0c;如果出现了以下情况中的任意一种&#xff0c;那么它就是跨域请求&#xff1a; 协议不同&#xff0c;如 http 和 https…

nacos 常见问题整理包含容器环境

文章目录 0. nacos客户端日志文件位置最常见的问题1. 容器环境端口开放不够导致的问题原理解析 2.服务端启用了鉴权客户端常见错误信息如下服务端报错信息如下 其他一些问题0. nacos高版本服务端是否支持旧的客户端&#xff1f;1. Error code:503,msg:server is DOWN now, plea…

【UE5】如何在UE5.1中创建级联粒子系统

1. 可以先新建一个actor蓝图&#xff0c;然后在该蓝图中添加一个“Cascade Particle System Component” 2. 在右侧的细节面板中&#xff0c;点击“模板”一项中的下拉框&#xff0c;然后点击“Cascade粒子系统&#xff08;旧版&#xff09;” 然后就可以选择在哪个路径下创建级…

前端Vue框架系列—— 学习笔记总结Day04

❤ 作者主页&#xff1a;欢迎来到我的技术博客&#x1f60e; ❀ 个人介绍&#xff1a;大家好&#xff0c;本人热衷于Java后端开发&#xff0c;欢迎来交流学习哦&#xff01;(&#xffe3;▽&#xffe3;)~* &#x1f34a; 如果文章对您有帮助&#xff0c;记得关注、点赞、收藏、…

Java NIO为何导致堆外内存OOM了?

Java NIO为何导致堆外内存OOM了&#xff1f; 描述 某天报警&#xff1a;某台机器部署的一个服务突然无法访问。谨记第一反应登录机器查看日志&#xff0c;因为服务挂掉&#xff0c;很可能因OOM。这个时候在机器的日志中发现了如下的一些信息&#xff1a; nio handle failed j…

C++对象和类

类的基础 1.cpp中对象默认访问权限都是private的 2.私有成员只能通过公有函数访问使用 3.类方法名称需包含类名&#xff0c;为函数的限定名 在头文件中定义类&#xff0c;同时将公有方法原型声明(类似Java接口道理). #ifndef STOCK00_H_ #define STOCK00_H_ #include<s…