Java with RocketMQ

Java with RocketMQ

  • 概念
  • 开始
  • 开发
    • 广播
    • 延时发送
    • 批量消息
    • 过滤消息
    • 事务
  • 如何保证消息不丢失
  • 如何存储和保证检索速度

概念

MQ指代Message Queue消息队列,通过在两个服务之间加入这种独立的消息队列应用,从而解耦不同服务之间的代码,使之可以通过熔断、限流等方式提供稳定可靠的高并发。
不同服务之间是通过发布与订阅的关系来生产和消费对应的消息,消息指代的是主题+标签+任意的数据内容。

发送消息的方式:

  • 同步,指生产者发送消息后等待结果返回,期间阻塞线程;
  • 异步,指生产者发送消息到队列后只留下成功与失败的异步回调入口,线程继续执行其他任务;
  • 单向,指生产者只发送消息但不关心后续情况,不期待结果。

接受消息的方式:

  • 推,指消费者处于被动监听,等待消息队列推送消息过来,这会对消息队列应用造成更大压力,且控制权在消息列队应用上而不在消费者服务;
  • 拉,指消费者主动去拉取消息,这样消费者服务拥有更多主动权,但也更容易对消费者造成额外性能损耗。

开始

  1. 可直接下载包安装或者通过Docker拉取镜像使用;
  2. 暴露PATH和mqnamesrv;
  3. 配置name server和broker后都启动;
  4. 使用内置的工具执行发送与接受消息队列测试;
  5. (可选)增加集群,配置多个worker,指定IP地址、密码、禁用防火墙、broker主从节点等;
  6. (可选)安装RocketMQ-Dashboard可以进行图形化界面管理RocketMQ;
  7. 在开发中引入并使用rocket-mq-client生产、消费消息。

第7点会在后面详细说明,前面的步骤需要先行完成。

开发

用到的关键代码:

  • DefaultMQProducer 生产者,需要指定组名、name server所在IP地址,启动后可以发送消息,用完需要关闭。
  • Message 消息载体,包含主题、标签、内容,用于提供给生产者要发送的消息。
  • MessageQueueSelector,发送消息选择器,producer.send()的参数,用于指定要发送哪个消息,可以传入索引值来让其按顺序发送消息(不使用时消息发送默认是随机的),需要在消费者中使用MessageListenerOrderly搭配消费处理。
  • DefaultPushMessageConsumer,推消息消费者,需要指定组名和name server服务所在IP地址和端口号,订阅主题与筛选标签后,添加监听器后启动可以接收和消费消息。
  • DefaultLitePullConsumer,拉消息消费者,可以随机获取一个消息队列或者指定一个消息队列
  • MessageListenerConcurrently,消息无序监听器
  • MessageListenerOrderly,消息有序监听器,需要在生产者中搭配MessageQueueSelector预先处理。

示例:

先在maven引入rocketmq-client库包

<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.1.0</version></dependency>
</dependencies>

定义用于发送消息的类方法:

public class TheProducer {public static void test(String content) {DefaultMQProducer producer = new DefaultMQProducer("JustAProducerGroupName");producer.setNamesrvAddr("192.168.43.137:9876");producer.start();CountDownLatch countDownLatch = new CountDownLatch(10);for (int i = 0; i < 10; i++) {Message msg = new Message("JustATopic", "SomeTags", (i + content).getBytes(StandardCharsets.UTF_8));//  同步发送,阻塞等待拿到发送结果,使用MessageQueueSelector让其按照循环顺序发送SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object n) {//   这里的n实际上send方法第三个实参传进来的索引值iInteger id = (Integer) n;int index = id % list.size();return list.get(index);}}, i);//  异步发送,注意没有使用MessageQueueSelector,此时消息看起来是按照循环的顺序发送,但实际并非如此而是随机的。producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.println("Success result: %d,%d", sendResult, i);}@Overridepublic void onException(Throwable throwable) {countDownLatch.countDown();System.out.println("Error: %d,%d", throwable.getStackTrace(), i);}});//  单向发送//  producer.sendOneway(msg);}countDownLatch.await(5, TimeUnit.SECONDS);producer.shutdown();}
}

定义接受推送消息的类方法:

public class PushConsumer {public static void start() {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("JustAConsumerGroupName");consumer.setNamesrvAddr("192.168.43.137:9876");//  *号表示接受所有Tagsconsumer.subscribe("JustATopic", "*");//  添加了一个无序监听器consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeContext) {list.forEachIndexed((msg, index) -> {System.out.println("Message:%d,%d", i, String.valueOf(msg.getBody())); });return ConsumeConcurrentlyStatus.CONSUME_SUCESS;}});consumer.start();}
}

定义拉取消息的类方法:

import java.util.Collection;public class RandomPullConsumer {public static void start() {DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("JustAConsumerGroupName");consumer.setNamesrvAddr("192.168.43.137:9876");//  指定主题和筛选标签,拉取时(调用poll方法)由消息队列应用随机提供一个返回consumer.subscribe("JustATopic", "*");consumer.start();while (true) {List<MessageExt> messageExtList = consumer.poll();System.out.println("Success get message");messageExt.forEach(msg -> {System.out.println("Message:%d", String.valueOf(msg.getBody()));});}}
}public class AppointPullConsumer {public static void start() {DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("JustAConsumerGroupName");consumer.setNamesrvAddr("192.168.43.137:9876");consumer.start();Collection<MessageQueue> messageQueues = consumer.fetchMessageQueues("JustATopic");ArrayList<MessageQueue> messageQueueList = new ArrayList<>(messageQueues);consumer.assign(messageQueueList);//  从指定的队列里获取某个消息,这里是指定了队列ID并获取到10个为止consumer.seek(messageQueueList.get(0), 10);while (true) {List<MessageExt> messageExtList = consumer.poll();System.out.println("Success get message");messageExt.forEach(msg -> {System.out.println("Message:%d", String.valueOf(msg.getBody()));});}}
}

广播

通过设置consumer.setMessageModel()来决定,广播方式分为:

  • MessageModel.BROADCASTING 广播消息,一条消息会发送给所有订阅了对应主题的消费者,无论是否同一个组的消费者,相当于所有符合要求的消费者都会接受到该消息。
  • MessageModel.CLUSTERING 集群消息,一条消息只能被同一个消费者组里的一个实例消费,相当于同一个组里只有一个消费者只会接到一条该消息,但不同组都会接收到消息。
DefaultPushConsumer consumer = new DefaultPushConsumer("JustAGroupName");
consumer.setMessageModel(MessageModel.BROADCASTING);

延时发送

对Message使用以下任意一个方法可以设置延时:

  • message.setDelayTimeLevel,设置延时等级,可选1~8,对应1s,2s,…,2h
  • message.setDelayTimeMS,设置延时毫秒
Message message = new Message();
message.setDelayTimeLevel(2);

批量消息

producer.send是可以直接传递List实参来批量发送消息的,但要注意消息总大小不能超过4M,且性能最佳大小为1M。
可以将这种限制与优化协程一个迭代器来帮助将位置大小的批量消息切成合适大小,关键点在于计算已经封装好的message大小。

public class MessageIterator implements Iterator<List<Message>> {List<Message> messageList;private int currentIndex;private int maxMessageSize = 10 * 1000;MessageIterator(List<Message> messageList) {this.messageList = messageList;}public boolean hasNext() {return currentIndex < messageList.size();}public List<Message> next() {int nextIndex = currentIndex;int totalSize = 0;for (; nextIndex < messageList.size(); nextIndex++) {Message message = messageList.get(nextIndex);int logSize = 20;int messageSize = logSize + message.getBody().length + message.getTopic().length();Map<String, String> properties = message.getProerties();Iterator<Map.Entry<String, String>> propertiesIterator = properties.entrySet().iterator();while (propertiesIterator.hasNext()) {Map.Entry<String, String> entry = iterator.next();messageSize += entry.getKey().length() + entry.getValue().length();}if (messageSize > maxMessageSize) {if (nextIndex == currentIndex)nextIndex++;break;}if (messageSize + totalSize > maxMessageSize) {break;} else {totalSize += messageSize;}}List<Message> newMessageList = messageList.subList(currentIndex, nextIndex);currentIndex = nextIndex;}
}public class BatchProducer {public static void start() {DefaultMQProducer producer = new DefaultMQProducer("JustAGroupName");List<Message> messageList = new ArrayList<>();for (int i = 0; i < 1000; i++) {Message message = new Message("JustATopic", "someTags", ("SomeContent").getBytes(StandardCharsets.UTF_8));messageList.add(message);}MessageIterator messageIterator = new MessageIterator(messageList);while (messageIterator.hasNext()) {SendResult sendResult = producer.send(messageIterator.next());}}
}

过滤消息

  • Tag 标签,每条消息可以定义单个字符串作为标签,消费者可以且或进行过滤查询,比较简单。
  • Sql 查询语句,生产者可以设置自定义的属性,之后消费者使用类似SQL同时查询Tag和自定义属性是否满足条件,达成复杂查询。注意只有推模式消费者才能使用,过滤操作是在消息队列应用的broker里完成的,拉模式不可用。

生产者设置Tag:

Message message = new Message("JustATopic", "TagA", "Some content".getBytes(Standard.UTF_8));

消费者使用Tag过滤消息:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("JustAGroupName");
consumer.subscribe("JustATopic", "TagA");

生产者设置自定义属性,用于SQL查询:

message.putUserProperty("JustAName", "JustAValue");

消费者使用SQL查询Tag和自定义属性:

consumer.subscribe("JustATopic", MessageSelector.bySql("TAGS is not null and TAGS in ('TagA' and 'TagB') and (JustAName is not null and JustAName between 0 and 3)"));

事务

消息队列作为一个中间应用,让原本的代码-代码的程序内调用变成了服务-消息列队-服务横跨两三个程序的调用,过程中可能发生任何问题,因此事务就变成了很重要的一点。
创建方式是先实现TransactionListener事务监听器类,再使用TransactionMQProducer创建生产者,添加事务监听器类,并用sendMessageInTransaction发送消息和TransactionSendResult接收结果。
事务监听器类的作用是用于决定事务是否成功,broker在接受到sendMessageInTransaction发送的消息后,会将其暂存到“半消息主题”区,之后回访这个事务监听器,等接收到提交的信号或者经历15次回访都是没状态才真正将消息从“半消息主题”移动到真正的消息主题里;反之如果接收到回滚的信号则丢弃该”半消息“。
注意RocketMQ的事务消息不支持延时和批量。

public class TransactionListenerImpl implements TransactionListener {//  对所有消息的首次回查事务是否正常,此时根据情况可以暂时返回无状态@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {String tags = message.getTags();if (StringUtils.contains("JustATag", tags)) {return LocalTransactionState.COMIT_MESSAGE;}if (StringUtils.contains("SomeThingWrong", tags)) {return LocalTransactionSTate.ROLLBACK_MESSAGE;}return LocalTransactionSTate.UNKNOW;}//  对无状态消息的定时回查方法@Overridepublic LocalTransactionSTate checkLocalTransaction(MessageExt messageExt) {String tags = messageExt.getTags();if (StringUtils.contains("JustATag", tags)) {return LocalTransactionState.COMIT_MESSAGE;}if (StringUtils.contains("SomeThingWrong", tags)) {return LocalTransactionSTate.ROLLBACK_MESSAGE;}return LocalTransactionSTate.UNKNOW;}
}public class TheTransaction {public static void start() {TransactionMQProducer producer = new TransactionMQProducer("JustAGroupName");//  addThreadInTransaction(producer)    //  可以开启线程提升性能producer.addTransactionListener(new TransactionListenerImpl());Message message = new Message("JustATopic", "JustATag", "Some content".getBytes(Standard.UTF_8));TransactionSendResult transactionsendResult = producer.sendMessageInTransaction(message, null);}public static void addThreadInMQProducer(TransactionMQProducer producer) {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("JustExecutorServiceName");return thread;}}));producer.addExecutorService(threadPoolExecutor);}
}

如何保证消息不丢失

消息存储过程:

  • 异步刷盘,broker接收到消息后存在内存后就返回成功,之后再存到硬盘
  • 同步刷盘,broker接收到消息后先存到硬盘,之后再返回成功

如何存储和保证检索速度

多个消息直接利用offset偏移量存储到同一个文件中,超过1G则另外新文件。
同时维护另一个索引值对应偏移量、标签对应索引值的列表,来确保可以根据需要进行范围查询或者筛选过滤查询。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/99678.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux | linux扩大磁盘空间 | centos7.9 | 虚拟机

注意&#xff1a;可以完全参考下面这边博客&#xff08;我只是搬运工&#xff09; centos扩大磁盘空间 简单讲讲&#xff0c;为什么有点失落落的&#xff1f; 明明就是一个 很程序化的东西 可是网上一大推 天花乱坠 而且很多人都是半吊子水&#xff0c;甚至半吊子都没有 通过关…

Ubuntu16.04apt更新失败

先设置网络设置 换成nat、桥接&#xff0c;如果发现都不行&#xff0c;那么就继续下面操作 1.如果出现一开始就e&#xff0c;检查源&#xff0c;先换源 2.换完源成功之后&#xff0c;ping网络&#xff0c;如果ping不通就是网络问题 如果ping baidu.com ping不通但是ping 112…

[网鼎杯 2018]Comment git泄露 / 恢复 二次注入 bash_history文件查看

首先我们看到账号密码有提示了 我们bp爆破一下 我首先对数字爆破 因为全字符的话太多了 爆出来了哦 所以账号密码也出来了 zhangwei zhangwei666 没有什么用啊 扫一下吧 有git git泄露 那泄露看看 真有 <?php include "mysql.php"; session_start(); if(…

leetCode 53.最大子数组和 动态规划 + 优化空间复杂度

关于此题我的往期文章&#xff1a; leetCode 53.最大子数和 图解 贪心算法/动态规划优化_呵呵哒(&#xffe3;▽&#xffe3;)"的博客-CSDN博客https://heheda.blog.csdn.net/article/details/13349726853. 最大子数组和 - 力扣&#xff08;LeetCode&#xff09; >&…

NSA 和 CISA 揭示十大网络安全错误配置

美国国家安全局 (NSA) 和网络安全与基础设施安全局 (CISA) 在5日公布了其红蓝团队在大型组织网络中发现的十大最常见的网络安全错误配置。 通报还详细介绍了威胁行为者使用哪些策略、技术和程序 (TTP) 来成功利用这些错误配置来实现各种目标&#xff0c;包括获取访问权限、横向…

40V汽车级P沟道MOSFET SQ4401EY-T1_GE3 工作原理、特性参数、封装形式—节省PCB空间,更可靠

AEC-Q101车规认证是一种基于失效机制的分立半导体应用测试认证规范。它是为了确保在汽车领域使用的分立半导体器件能够在严苛的环境条件下正常运行和长期可靠性而制定的。AEC-Q101认证包括一系列的失效机制和应力测试&#xff0c;以验证器件在高温、湿度、振动等恶劣条件下的可…

设计模式 - 行为型模式:责任链模式(概述 | 案例实现 | 优缺点 | 使用场景)

目录 一、行为型模式 1.1、责任链模式 1.1.1、概述 1.1.2、案例实现 1.1.3、优缺点 1.1.4、使用场景 一、行为型模式 1.1、责任链模式 1.1.1、概述 为了避免请求发送者和多个请求处理者耦合在一起&#xff0c;就将所有请求处理者通过前一个对象记住下一个对象的引用的方…

uniapp apple 苹果登录 离线本地打包

官方文档 uni-app官网 文档写的不全&#xff0c;没有写离线打包流程 加lib 签名里带 sign in with apple hbuilder开关 代码 测试代码&#xff0c;获取app里所有的provider uni.getProvider({service: oauth,success: function (res) {console.log(res.provider)uni.showT…

【HTML5】语义化标签记录

前言 防止一个页面中全部都是div&#xff0c;或者ul li&#xff0c;在html5推出了很多语义化标签 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 常用语义化案例 一般我用的多的是header&#xff0c;main&#xff0c;footer 这些标签不难理解&#x…

【学习笔记】数据结构算法文档(类C语言)

0、类C语言代码说明 // 函数结果状态代码 #define OK 1 #define ERROR 0 #define OVERFLOW -2// 函数返回值类型&#xff08;返回函数结果状态代码&#xff09; typedef int Status;// 用户自定义数据元素类型 ElemType typedef xxx ElemType;// C 引用&#xff08;示例&#…

2023年中国助消化药物行业现状分析:消化不良患者逐年上升,提升需求量[图]

助消化药物主要分为促胃动力药物、消化酶抑制剂、胃酸抑制药物和消食剂4种类型。促胃动力药物的作用机制是通过增强胃肠道平滑肌动力促进胃酸分泌&#xff0c;从而达到助消化的目的&#xff0c;临床常用药物包括多潘立酮、莫沙必利、西沙比利等。 助消化药物分类 资料来源&…

Observability:使用 OpenTelemetry 对 Node.js 应用程序进行自动检测

作者&#xff1a;Bahubali Shetti DevOps 和 SRE 团队正在改变软件开发的流程。 DevOps 工程师专注于高效的软件应用程序和服务交付&#xff0c;而 SRE 团队是确保可靠性、可扩展性和性能的关键。 这些团队必须依赖全栈可观察性解决方案&#xff0c;使他们能够管理和监控系统&a…

Django开发之进阶篇

Django进阶篇 一、Django学习之模板二、Django学习之中间件默认中间件自定义中间件 三、Django学习之ORM定义模型类生成数据库表操作数据库添加查询修改删除 一、Django学习之模板 在 Django 中&#xff0c;模板&#xff08;Template&#xff09;是用于生成动态 HTML&#xff…

【架构】研发高可用架构和系统设计经验

研发高可用架构和系统设计经验 从研发规范层面、应用服务层面、存储层面、产品层面、运维部署层面、异常应急层面这六大层面去剖析一个高可用的系统需要有哪些关键的设计和考虑。 一、高可用架构和系统设计思想 1.可用性和高可用概念 可用性是一个可以量化的指标,计算的公…

Java 8遍历Map的方式

1、使用entrySet()和stream()方法结合遍历Map Map<String, String> map new HashMap<>();map.put("A001", "zhangsan");map.put("A002", "lisi");map.entrySet().stream().forEach(entry -> {String key entry.getKe…

自动拟人对话机器人在客户服务方面起了什么作用?

在当今数字时代&#xff0c;企业不断寻求创新的方法来提升客户服务体验。随着科技的不断进步和消费者期望的提升&#xff0c;传统的客户服务方式逐渐无法满足现代消费者的需求。因此&#xff0c;许多企业正在积极探索利用新兴技术来改进客户服务&#xff0c;自动拟人对话机器人…

LuatOS-SOC接口文档(air780E)-- gmssl - 国密算法

sm.sm2encrypt(pkx,pky,data)# sm2算法加密 参数 传入值类型 解释 string 公钥x,必选 string 公钥y,必选 string 待计算的数据,必选,最长255字节 返回值 返回值类型 解释 string 加密后的字符串, 原样输出,未经HEX转换 例子 local originStr "encryptio…

新增一个timestamp.html 页面 --chatGPT

问&#xff1a;新增一个timestamp.html 页面&#xff0c;页面实现日期和时间戳 互转功能 gpt: 要创建一个 timestamp.html 页面&#xff0c;用于实现日期和时间戳的互转功能&#xff0c;可以按照以下步骤操作&#xff1a; 1. 创建一个名为 timestamp.html 的 HTML 文件&…

阿里云轻量应用服务器流量价格表(计费/免费说明)

阿里云轻量应用服务器套餐有的限制月流量&#xff0c;有的不限制月流量&#xff0c;限制每月流量的套餐&#xff0c;如果自带的免费月流量包用完了&#xff0c;流量超额部分需要另外支付流量费&#xff0c;阿里云百科aliyunbaike.com分享阿里云轻量应用服务器月流量超额收费价格…

phpstorm不提示$this->request,不提示Controller父类的方法

![在这里插入图片描述](https://img-blog.csdnimg.cn/d55799a22b724099930eb7fb67260a12.png 最后 保存就可以了