rocketmq 顺序消息原理与实战

消费者pull和push

pull 为主动从broker获取消息
Push为broker主动推送消息个consumer 实时性更高,但流量要自己控制
PullBatchSize,代表的是每次从broker的一个队列上拉取的最大消息数。
consumeThreadMax 和 consumeThreadMin 代表消费者pull消息时需要的线程最大和最小数量

广播模式和集群模式

广播把消息发送给订阅这个主题的所有消费者
广播消息不支持消息重试

集群是消息只要有一个消费者消费后变算为消费成功
顺序消息必须为集群模式

顺序消息:

Rocketmq全局顺序消息和局部顺序
全局顺序:一个Topic内所有的消息按照先进先出的顺序进行发布和消费。
部分顺序:一个部分内所有的消息按照先进先出的顺序进行发布和消费。

三个阶段保证消息顺序:
生产顺序性:单一的生产者 并且 串行发送消息
存储时保持和发送的顺序一致
消费时保持和存储的顺序一致

关于重试:

顺序消息可以设置最大重试次数,若不设置则可以认为是无限次
若设置,则达到最大重试次数时消息会变为已消费,会执行后序消息,可能无法保证消息的顺序性,所以要做好顺序验证

顺序消息的重试间隔为固定时间
无序消息的重试时间为阶梯时间

重试次数可通过maxReconsumeTimes 参数进行设置

广播消息不可重试

全局顺序

生产者
创建topic时只创建一个queue,所有的消息都保存在同一个broker里就可以保证顺序
或者选择其中一个队列也可以
生产者就和普通的没区别

producer.send(message);

消费者

@PostConstruct
public void init{consumer = new DefaultMQPushConsumer("group");consumer.setNamesrvAddr("1270.0.1:9876");consumer.setMessageModel(MessageModel.BROADCASTING);consumer.setPullInterval(2000);consumer.setPullBatchSize(100);// 顺序消息设置为1,多个其他的会被空置consumer.setConsumeThreadMin(1);try {consumer.subscribe("topic", "tag");} catch (MQClientException e) {throw new RuntimeException(e);}consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {if (CollectionUtils.isEmpty(msgs)) {return ConsumeOrderlyStatus.SUCCESS;}for (MessageExt msg : msgs) {try {// 处理message消息} catch (Exception e) {log.info("Consumer message failed", e);throw new RuntimeException(e);}}return ConsumeOrderlyStatus.SUCCESS;});try {consumer.start();} catch (Exception e) {throw new RuntimeException(e);}
}

局部顺序

假设场景,一个订单的不同操作需要保证顺序,比如订单生成->支付->完成
此时方法中arg参数传订单号即可,保证需要顺序的消息有一个统一的标识可以进入到同一个队列中
顺序消息的逻辑就是通过统一标识的hashcode和队列数量size进行取余操作
所以顺序消息有个前提是这个topic的队列数量不可随意修改(倍数可以),否则顺序消息会出现异常(可提前设置此topic的队列数量为最大值16)

生产者

public void sendMsg(MessageInfo info,Object arg){String json = JSON.toJSONString(info);Message message = new Message("messageType", json.getBytes());try {producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {int value = arg.hashCode() % list.size();if (value < 0) {value = Math.abs(value);}return list.get(value);}}, arg);} catch (Exception e) {throw new RuntimeException(e);} 
}

消费者
同全局顺序消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/868251.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AI产品经理发展与规划

今天引用高飞老师的讲课内容&#xff0c;分享一下&#xff0c;何为AI产品经理&#xff1f;这个话题不仅仅希望介绍AI产品经理的工作方式等方面的内容&#xff0c;更多的在于讨论未来产品经理这个行业应该如何发展&#xff1f;行业壁垒在何处&#xff1f;如何应对中年危机&#…

嵌入式驱动源代码(8):IO 模拟 SPI

目录 1、SPI初始化函数 2、SPI写函数 3、SPI写字节 4、spi读字节 5、spi写字节 SPI是一种通用的通信方式,很多时候为了节省成本,选择的主控制器资源不足。 这个时候外设需要用到SPI,就只能通过io模拟的方式来实现了。 1、SPI初始化函数 /*****************************…

Flume工具详解

Flume是一个由Apache提供的开源日志收集系统&#xff0c;最初由Cloudera贡献。它以其高可用性、高可靠性和分布式特性而著称&#xff0c;被广泛应用于海量日志的采集、聚合和传输。以下是对Flume工具的详细解析&#xff1a; 一、概述 功能定位&#xff1a;Flume主要用于收集、…

名企面试必问30题(二十六)——毕业这么久了,为什么还没有找到工作?

回答一&#xff1a; “毕业后的这段时间&#xff0c;我一直在努力寻找最适合自己发展的岗位。我没有急于随便接受一份工作&#xff0c;而是希望能够进入一个与我的专业技能和职业规划高度匹配的公司。在这个过程中&#xff0c;我不断提升自己的技术能力&#xff0c;学习新的测…

PyQT: 开发一款ROI绘制小程序

在一些基于图像或者视频流的应用中&#xff0c;比如电子围栏/客流统计等&#xff0c;我们需要手动绘制一些感兴趣&#xff08;Region of Interest&#xff0c;简称ROI&#xff09;区域。 在这里&#xff0c;我们基于Python和PyQt5框架开发了一款桌面应用程序&#xff0c;允许用…

c#类型转换和常见集合类型

目录 1. 整数转换&#xff0c;整数和字符串&#xff0c;字符串和整数之间的转换怎么实现&#xff1f; 2. 日期转换&#xff0c;获取当前日期&#xff0c;字符串转日期&#xff0c;日期转字符串怎么实现&#xff1f; 3. 举例一维、二维、三维数组 4. 需求&#xff1a;有个88…

事务(数据库)

是一组操作的集合&#xff0c;是一个不可分割的工作单位&#xff0c;事物会把所有的操作作为一个整体一起向系统提交或撤销操作请求&#xff0c;这些操作要么同时成功&#xff0c;要么同时失败 create table account(id int auto_increment primary key comment 主键ID,name va…

VPN 的入门介绍

VPN&#xff08;虚拟专用网络&#xff09; 简介 虚拟专用网络&#xff0c;简称虚拟专网&#xff08;VPN&#xff09;&#xff0c;其主要功能是在公用网络上建立专用网络&#xff0c;进行加密通讯。在企业网络中有广泛应用。VPN网关通过对数据包的加密和数据包目标地址的转换实…

14-48 剑和诗人22 - RAG 的主要痛点和解决方案

​​​​​ 检索增强生成 (RAG) 模型已成为一种有前途的方法&#xff0c;它利用存储在文档中的外部知识来提高生成文本的准确性和相关性。通过检索和调节相关的上下文文档&#xff0c;与传统语言模型相比&#xff0c;RAG 模型可以产生更真实、更深入和更具体的响应。 然而&…

诸葛亮的空城计 - 代理模式

定场诗 “无形之中蕴含至理&#xff0c;虚实相生方见大道。” 在三国演义中&#xff0c;诸葛亮的空城计可谓神来之笔。这看似冒险的策略&#xff0c;实则蕴含深意。今天&#xff0c;我们将透过空城计&#xff0c;一窥软件设计中代理模式的奥秘。 西城无人旦夕危&#xff0c;…

君方智能设计平台-事务管理技术方案

1.背景介绍 事务处理是指对数据进行一组操作&#xff0c;这些操作要么全部成功&#xff0c;要么全部失败&#xff0c;以确保数据的一致性和完整性。软件的事务管理主要实现方案主要涉及以下几个方面&#xff1a; &#xff08;1&#xff09;数据一致性&#xff1a;在CAD软件中…

STM32实现看门狗(HAL库)

文章目录 一. 看门狗1. 独立看门狗&#xff08;IWDG&#xff09;1.1 原理1.2 相关配置1.3 相关函数 2. 窗口看门狗&#xff08;WWDG&#xff09;2.1 原理2.2 相关配置2.3 相关函数 一. 看门狗 单片机在日常工作中常常会因为用户配置代码出现BUG&#xff0c;而导致芯片无法正常工…

Flask项目搭建及部署(完整版!全网最全)

flask搭建及部署 pip 19.2.3 python 3.7.5 Flask 1.1.1 Flask-SQLAlchemy 2.4.1 Pika 1.1.0 Redis 3.3.11 flask-wtf 0.14.2 1、创建flask项目&#xff1a; 创建完成后整个项目结构树&#xff1a; app.py: 项⽬管理⽂件&#xff0c;通过它管理项⽬。 static: 存放静态文…

map和set的原理、优劣势、应用场景和示例代码,统统告诉你。

map和set的原理都是基于哈希表实现的&#xff0c;通过哈希值来快速查找和插入元素&#xff0c;从而实现高效的数据存储和管理&#xff0c;那么他们之间有什么不同呢&#xff0c;该如何选择&#xff0c;本文带你了解。 一、map和set的原理 map和set都是数据结构&#xff0c;用…

【分布式系统三】监控平台Zabbix对接grafana(截图详细版)

目录 一.安装grafana并启动 二.浏览器访问 三.导入zabbix数据&#xff0c;对接grafana 四.如何导入模版 以前两篇博客为基础 【分布式系统】监控平台Zabbix介绍与部署&#xff08;命令截图版&#xff09;-CSDN博客 【分布式系统】监控平台Zabbix自定义模版配置-CSDN博客 …

白骑士的C语言教学高级篇 3.2 高级数据结构

系列目录 上一篇&#xff1a;白骑士的C语言教学高级篇 3.1 高级指针技术 在计算机科学中&#xff0c;数据结构是组织和存储数据的方式&#xff0c;不同的数据结构适用于不同的问题和算法。本节将介绍链表、栈与队列以及树与图&#xff0c;这些高级数据结构在实际编程中非常常用…

java ReadWriteLock接口

在 Java 中&#xff0c;ReadWriteLock 接口的实现类ReentrantReadWriteLock 类提供了一种允许多个线程同时读取某一资源但只允许一个线程写的锁定机制。这种机制可以提高并发性能&#xff0c;特别是在读操作远多于写操作的场景下。 特性&#xff1a; 可重入&#xff1b;不存…

使用Redis实现缓存穿透的解决方案

使用Redis实现缓存穿透的解决方案 大家好&#xff0c;我是微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01; 在缓存系统中&#xff0c;缓存穿透是指访问不存在的数据&#xff0c;导致请求直接穿透缓存层&#xff0c;直接访问…

前端使用Threejs加载机械臂并控制机械臂跳舞

1. 前言 在我的第一篇博客中,大致讲解了如何使用threejs导入机械臂模型,以及如何让机械臂模型动起来的案例,可以看一下之前的博客前端使用Threejs控制机械臂模型运动 本篇博客主要讲解的是在原来的基础上添加GSAP动画库的应用,可以通过动画,来让机械臂进行指定轨迹位姿的运动…

基于SpringBoot的休闲娱乐代理售票系统

本系统主要包括管理员和用户两个角色组成&#xff1b;主要包括&#xff1a;首页、个人中心、用户管理、折扣票管理、分类管理、订单信息管理、退票信息管理、出票信息管理、系统管理等功能的管理系统。 &#x1f495;&#x1f495;作者&#xff1a;Weirdo &#x1f495;&#x…