rocketmq 消费方式_RocketMQ事务消费和顺序消费详解

一、RocketMq有3中消息类型

1.普通消费

2. 顺序消费

3.事务消费

顺序消费场景

在网购的时候,我们需要下单,那么下单需要假如有三个顺序,第一、创建订单 ,第二:订单付款,第三:订单完成。也就是这个三个环节要有顺序,这个订单才有意义。RocketMQ可以保证顺序消费。

rocketMq实现顺序消费的原理

produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息

注意:是把把消息发到同一个队列(queue),不是同一个topic,默认情况下一个topic包括4个queue

单个节点(Producer端1个、Consumer端1个)

1、Producer.java

packageorder;importjava.util.List;importcom.alibaba.rocketmq.client.exception.MQBrokerException;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.client.producer.DefaultMQProducer;importcom.alibaba.rocketmq.client.producer.MessageQueueSelector;importcom.alibaba.rocketmq.client.producer.SendResult;importcom.alibaba.rocketmq.common.message.Message;importcom.alibaba.rocketmq.common.message.MessageQueue;importcom.alibaba.rocketmq.remoting.exception.RemotingException;/*** Producer,发送顺序消息*/

public classProducer {public static voidmain(String[] args) {try{

DefaultMQProducer producer= new DefaultMQProducer("order_Producer");

producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");

producer.start();//String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",//"TagE" };

for (int i = 1; i <= 5; i++) {

Message msg= new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " +i).getBytes());

SendResult sendResult= producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {

Integer id=(Integer) arg;int index = id %mqs.size();returnmqs.get(index);

}

},0);

System.out.println(sendResult);

}

producer.shutdown();

}catch(MQClientException e) {

e.printStackTrace();

}catch(RemotingException e) {

e.printStackTrace();

}catch(MQBrokerException e) {

e.printStackTrace();

}catch(InterruptedException e) {

e.printStackTrace();

}

}

}

2、Consumer.java

packageorder;importjava.util.List;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.atomic.AtomicLong;importcom.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;importcom.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;importcom.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;importcom.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.common.consumer.ConsumeFromWhere;importcom.alibaba.rocketmq.common.message.MessageExt;/*** 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)*/

public classConsumer1 {public static void main(String[] args) throwsMQClientException {

DefaultMQPushConsumer consumer= new DefaultMQPushConsumer("order_Consumer");

consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费

* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicOrderTest", "*");

consumer.registerMessageListener(newMessageListenerOrderly() {

AtomicLong consumeTimes= new AtomicLong(0);public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) {//设置自动提交

context.setAutoCommit(true);for(MessageExt msg : msgs) {

System.out.println(msg+ ",内容:" + newString(msg.getBody()));

}try{

TimeUnit.SECONDS.sleep(5L);

}catch(InterruptedException e) {

e.printStackTrace();

}

;returnConsumeOrderlyStatus.SUCCESS;

}

});

consumer.start();

System.out.println("Consumer1 Started.");

}

}

结果如下图所示:

b3e43a6a2d56708930f3977a9b8ada6c.png

这个五条数据被顺序消费了

多个节点(Producer端1个、Consumer端2个)

Producer.java

packageorder;importjava.util.List;importcom.alibaba.rocketmq.client.exception.MQBrokerException;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.client.producer.DefaultMQProducer;importcom.alibaba.rocketmq.client.producer.MessageQueueSelector;importcom.alibaba.rocketmq.client.producer.SendResult;importcom.alibaba.rocketmq.common.message.Message;importcom.alibaba.rocketmq.common.message.MessageQueue;importcom.alibaba.rocketmq.remoting.exception.RemotingException;/*** Producer,发送顺序消息*/

public classProducer {public static voidmain(String[] args) {try{

DefaultMQProducer producer= new DefaultMQProducer("order_Producer");

producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");

producer.start();//String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",//"TagE" };

for (int i = 1; i <= 5; i++) {

Message msg= new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " +i).getBytes());

SendResult sendResult= producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {

Integer id=(Integer) arg;int index = id %mqs.size();returnmqs.get(index);

}

},0);

System.out.println(sendResult);

}for (int i = 1; i <= 5; i++) {

Message msg= new Message("TopicOrderTest", "order_2", "KEY" + i, ("order_2 " +i).getBytes());

SendResult sendResult= producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {

Integer id=(Integer) arg;int index = id %mqs.size();returnmqs.get(index);

}

},1);

System.out.println(sendResult);

}for (int i = 1; i <= 5; i++) {

Message msg= new Message("TopicOrderTest", "order_3", "KEY" + i, ("order_3 " +i).getBytes());

SendResult sendResult= producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {

Integer id=(Integer) arg;int index = id %mqs.size();returnmqs.get(index);

}

},2);

System.out.println(sendResult);

}

producer.shutdown();

}catch(MQClientException e) {

e.printStackTrace();

}catch(RemotingException e) {

e.printStackTrace();

}catch(MQBrokerException e) {

e.printStackTrace();

}catch(InterruptedException e) {

e.printStackTrace();

}

}

}

Consumer1.java

/*** 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)*/

public classConsumer1 {public static void main(String[] args) throwsMQClientException {

DefaultMQPushConsumer consumer= new DefaultMQPushConsumer("order_Consumer");

consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费

* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicOrderTest", "*");/*** 实现了MessageListenerOrderly表示一个队列只会被一个线程取到

*,第二个线程无法访问这个队列*/consumer.registerMessageListener(newMessageListenerOrderly() {

AtomicLong consumeTimes= new AtomicLong(0);public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) {//设置自动提交

context.setAutoCommit(true);for(MessageExt msg : msgs) {

System.out.println(msg+ ",内容:" + newString(msg.getBody()));

}try{

TimeUnit.SECONDS.sleep(5L);

}catch(InterruptedException e) {

e.printStackTrace();

}

;returnConsumeOrderlyStatus.SUCCESS;

}

});

consumer.start();

System.out.println("Consumer1 Started.");

}

}

Consumer2.java

/*** 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)*/

public classConsumer2 {public static void main(String[] args) throwsMQClientException {

DefaultMQPushConsumer consumer= new DefaultMQPushConsumer("order_Consumer");

consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费

* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicOrderTest", "*");/*** 实现了MessageListenerOrderly表示一个队列只会被一个线程取到

*,第二个线程无法访问这个队列*/consumer.registerMessageListener(newMessageListenerOrderly() {

AtomicLong consumeTimes= new AtomicLong(0);public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) {//设置自动提交

context.setAutoCommit(true);for(MessageExt msg : msgs) {

System.out.println(msg+ ",内容:" + newString(msg.getBody()));

}try{

TimeUnit.SECONDS.sleep(5L);

}catch(InterruptedException e) {

e.printStackTrace();

}

;returnConsumeOrderlyStatus.SUCCESS;

}

});

consumer.start();

System.out.println("Consumer2 Started.");

}

}

先启动Consumer1和Consumer2,然后启动Producer,Producer会发送15条消息

Consumer1消费情况如图,都按照顺序执行了

4ffbd4eb8d4e6a82eb3be8c551f0738b.png

Consumer2消费情况如图,都按照顺序执行了

36d754b5c5172483626d8b22900d8a12.png

二、事务消费

这里说的主要是分布式事物。下面的例子的数据库分别安装在不同的节点上。

事物消费需要先说说什么是事务。比如说:我们跨行转账,从工商银行转到建设银行,也就是我从工商银行扣除1000元之后,我的建设银行也必须加1000元。这样才能保证数据的一致性。假如工商银行转1000元之后,建设银行的服务器突然宕机,那么我扣除了1000,但是并没有在建设银行给我加1000,就出现了数据的不一致。因此加1000和减1000才行,减1000和减1000必须一起成功,一起失败。

再比如,我们进行网购的时候,我们下单之后,订单提交成功,仓库商品的数量必须减一。但是订单可能是一个数据库,仓库数量可能又是在另个数据库里面。有可能订单提交成功之后,仓库数量服务器突然宕机。这样也出现了数据不一致的问题。

使用消息队列来解决分布式事物:

现在我们去外面饭店吃饭,很多时候都不会直接给了钱之后直接在付款的窗口递饭菜,而是付款之后他会给你一张小票,你拿着这个小票去出饭的窗口取饭。这里和我们的系统类似,提高了吞吐量。即使你到第二个窗口,师傅告诉你已经没饭了,你可以拿着这个凭证去退款,即使中途由于出了意外你无法到达窗口进行取饭,但是只要凭证还在,可以将钱退给你。这样就保证了数据的一致性。

如何保证凭证(消息)有2种方法:

1、在工商银行扣款的时候,余额表扣除1000,同时记录日志,而且这2个表是在同一个数据库实例中,可以使用本地事物解决。然后我们通知建设银行需要加1000给该用户,建设银行收到之后给我返回已经加了1000给用户的确认信息之后,我再标记日志表里面的日志为已经完成。

2、通过消息中间件

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/265614.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

GPIO输入输出模式原理(八种工作方式附电路图详解)

这几篇博文讲的不错&#xff0c;可参照着理解&#xff1a; STM32下拉输入模式与振动传感器的使用 “上拉电阻与下拉电阻”通俗解读 上、下拉电阻&#xff08;定义、强弱上拉、常见作用、吸电流、拉电流、灌电流&#xff09; 个人总结&#xff1a; 模拟量选浮空输入&#xff0c…

STM32F103构建固件库模板(PS固件库文件树介绍)

参考&#xff1a;STM32F103ZE新建固件库模板 作者&#xff1a;追兮兮 发布时间&#xff1a;2020-10-14 10:31:45 网址&#xff1a;https://blog.csdn.net/weixin_44234294/article/details/109065495 参考博文&#xff1a;https://blog.csdn.net/visual_eagle/article/details/…

STM32F103实现点灯(固件库方式)

参考&#xff1a;stm32上实现点灯 作者&#xff1a;SKY丶丿平才 发布时间&#xff1a; 2021-03-20 16:51:06 网址&#xff1a;https://blog.csdn.net/weixin_48264057/article/details/115028724 目录前言一、硬件设计二、软件设计1.建立工程2.代码编写三、实际验证1.烧写程序2…

STM32震动感应控制继电器(使用循环VS使用外部中断EXTI和中断控制器NVIC)

参考&#xff1a;stm32的外部中断 震动感应 控制 继电器 作者&#xff1a;点灯小哥 发布时间&#xff1a; 2021-03-05 22:37:01 网址&#xff1a;https://blog.csdn.net/weixin_46016743/article/details/114417161 参考&#xff1a;STM32震动感应灯 作者&#xff1a;一只小阿大…

结构体与共用体(联合体)

参考&#xff1a;结构体与共用体 作者&#xff1a;一只青木呀 发布时间&#xff1a; 2020-08-09 08:29:22 网址&#xff1a;https://blog.csdn.net/weixin_45309916/article/details/107889394 参考&#xff1a;联合体&#xff08;union&#xff09;的使用方法及其本质 作者&am…

STM32串口原理、结构体、库函数、串口发送字符(串)、重定向printf串口发送、串口中断接收控制灯、接收不定长数据

参考&#xff1a;串口的结构体 重定向printf串口发送stm32等博文 作者&#xff1a;点灯小哥 发布时间&#xff1a; 2021-03-06 21:46:33 网址&#xff1a;https://blog.csdn.net/weixin_46016743/article/details/114458698 目录串口相关知识定义通信概念1.通讯结构2.电平标准3…

STM32通用定时器(原理、结构体、库函数、定时器中断每秒闪烁一次灯) —— 时钟源、分频值、重装载值

参考&#xff1a;stm32定时器与定时器中断 作者&#xff1a;打酱油的 发布时间&#xff1a; 2021-04-11 01:04:09 网址&#xff1a;https://blog.csdn.net/weixin_46098612/article/details/115493737 参考&#xff1a;stm32通用定时器结构体函数定时器实现led闪烁 作者&#x…

关于错误error C4430 error C2365 error C2078 error C2440 error C2143的处理。

关于错误error C4430 error C2365 error C2078 error C2440 error C2143的处理。 楼主在公司接手了一段代码。编译的时候发现一个很奇怪的问题&#xff0c;就是获取最新代码的时候能编译通过&#xff0c;然后下次就编译不通过了&#xff0c;提示如下错误。 后来发现问题出在std…

STM32端口复用和端口重映射

参考&#xff1a;stm32 端口复用和重映射 作者&#xff1a;点灯小哥 发布时间&#xff1a; 2021-03-09 13:49:19 网址&#xff1a;https://blog.csdn.net/weixin_46016743/article/details/114581032 目录端口复用1.什么是端口复用2. 如何配置端口复用(以PA9、PA10串口为例)端口…

Scala学习:Curry化的函数

2019独角兽企业重金招聘Python工程师标准>>> 在第1章&#xff0c;我们说过Scala允许你创建新的“感觉像是原生语言支持”的控制抽象。尽管到目前你已经看到的例子都的确是控制抽象&#xff0c;不过任何人都不会 误以为它们是原生语言支持的。为了搞明白如何让控制抽…

js 获取session_Python实战案例:这是你见过的最详细的JS加密登录某博

0x00 抓包分析简单的搜索之后发现&#xff0c;很多参数都是登陆上面这个请求返回的值&#xff0c;这个请求在输入完账号光标到达密码框时就会生成!0x01 加密逻辑分析搜索su可以很快找到加密的位置&#xff0c;上图看到e.su和e.sp都是由sinaSSOEncoder这个函数生成的&#xff0c…

STM32通用定时器输出PWM控制舵机 —— 重装载值、比较值、当前值

参考&#xff1a;stm32 定时器输出PWM原理及工作原理控制舵机 作者&#xff1a;点灯小哥 发布时间&#xff1a; 2021-03-09 23:17:52 网址&#xff1a;https://blog.csdn.net/weixin_46016743/article/details/114606662 参考&#xff1a;stm32之pwm输出 作者&#xff1a;打酱油…

STM32系统定时器SysTick(只能向下递减)延时闪烁灯

参考&#xff1a;stm32 系统定时器 SysTick 作者&#xff1a;点灯小哥 发布时间&#xff1a; 2021-03-10 13:46:00 网址&#xff1a;https://blog.csdn.net/weixin_46016743/article/details/114633245 参考&#xff1a;stm32之系统定时器 作者&#xff1a;打酱油的&#xff1b…

WMS学习笔记:1.尝试加载WMS

1.首先找一个可用的WMS栅格地图服务&#xff1a;http://demo.cubewerx.com/demo/cubeserv/cubeserv.cgi 获取GetCapabilities&#xff1a; http://demo.cubewerx.com/demo/cubeserv/cubeserv.cgi?serviceWMS&requestGetCapabilities 2.在ArcGIS中添加GIS空间服务 2.在ArcC…

mysqli 扩展_MySQLi的优势

1. 功能增加了2&#xff0c;效率大大增加(以后的PHP项目改成mysqli)3&#xff0c;更稳定mysqli使用面向对象技术&#xff0c;但也支持过程化的使用方式mysqli扩展中给我提供了三个类&#xff1a;1. mysqli和连接有关的类2. mysqli_result表达了对数据库的查询所返回的结果集。 …

STM32F4开发板硬件简介

更多干货推荐可以去牛客网看看&#xff0c;他们现在的IT题库内容很丰富&#xff0c;属于国内做的很好的了&#xff0c;而且是课程刷题面经求职讨论区分享&#xff0c;一站式求职学习网站&#xff0c;最最最重要的里面的资源全部免费&#xff01;&#xff01;&#xff01;点击进…

演示:EIGRP非等价负载均衡(故障分析与解决篇)

演示&#xff1a;EIGRP非等价负载均衡的故障分析与排除故障背景&#xff1a;在如下图14.20所示的网络环境&#xff0c;工程师完成了环境中所有路由器的接口地址配置有EIGRP动态路由协议的启动&#xff0c;目前每台EIGRP路由器邻居关系正常&#xff0c;路由学习正常&#xff0c;…

STM32超声波模块测距串口输出/通用定时器中断并输出PWM控制舵机/系统定时器延时

参考&#xff1a;stm32 超声波模块 原理 实现测距 舵机使用 作者&#xff1a;点灯小哥 发布时间&#xff1a; 2021-03-10 19:37:16 网址&#xff1a;https://blog.csdn.net/weixin_46016743/article/details/114643703 目录效果展示超声波传感器原理超声波测距编程步骤代码编写…

STM32F103五分钟入门系列(二)GPIO的七大寄存器+GPIOx_LCKR作用和配置

摘自&#xff1a;STM32F103五分钟入门系列&#xff08;二&#xff09;GPIO的七大寄存器GPIOx_LCKR作用和配置 作者&#xff1a;自信且爱笑‘ 发布时间&#xff1a; 2021-05-01 12:08:32 网址&#xff1a;https://blog.csdn.net/Curnane0_0/article/details/116276876?spm1001.…

STM32使用IIC总线通讯协议在OLED屏幕上显示字符串、汉字、图像(硬件IIC)

参考&#xff1a;基于STM32-Oled&#xff08;IIC&#xff09;的使用 作者&#xff1a;奋斗的小殷 发布时间&#xff1a; 2021-05-07 13:09:26 网址&#xff1a;https://blog.csdn.net/boybs/article/details/116465668 目录IIC总线简介IIC协议简介IIC总线系统结构IIC总线物理层…