rocketmq 消费方式_RocketMQ事务消费和顺序消费详解

一、RocketMq有3中消息类型

1.普通消费

2. 顺序消费

3.事务消费

顺序消费场景

在网购的时候,我们需要下单,那么下单需要假如有三个顺序,第一、创建订单 ,第二:订单付款,第三:订单完成。也就是这个三个环节要有顺序,这个订单才有意义。RocketMQ可以保证顺序消费。

rocketMq实现顺序消费的原理

produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息

注意:是把把消息发到同一个队列(queue),不是同一个topic,默认情况下一个topic包括4个queue

单个节点(Producer端1个、Consumer端1个)

1、Producer.java

packageorder;importjava.util.List;importcom.alibaba.rocketmq.client.exception.MQBrokerException;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.client.producer.DefaultMQProducer;importcom.alibaba.rocketmq.client.producer.MessageQueueSelector;importcom.alibaba.rocketmq.client.producer.SendResult;importcom.alibaba.rocketmq.common.message.Message;importcom.alibaba.rocketmq.common.message.MessageQueue;importcom.alibaba.rocketmq.remoting.exception.RemotingException;/*** Producer,发送顺序消息*/

public classProducer {public static voidmain(String[] args) {try{

DefaultMQProducer producer= new DefaultMQProducer("order_Producer");

producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");

producer.start();//String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",//"TagE" };

for (int i = 1; i <= 5; i++) {

Message msg= new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " +i).getBytes());

SendResult sendResult= producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {

Integer id=(Integer) arg;int index = id %mqs.size();returnmqs.get(index);

}

},0);

System.out.println(sendResult);

}

producer.shutdown();

}catch(MQClientException e) {

e.printStackTrace();

}catch(RemotingException e) {

e.printStackTrace();

}catch(MQBrokerException e) {

e.printStackTrace();

}catch(InterruptedException e) {

e.printStackTrace();

}

}

}

2、Consumer.java

packageorder;importjava.util.List;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.atomic.AtomicLong;importcom.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;importcom.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;importcom.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;importcom.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.common.consumer.ConsumeFromWhere;importcom.alibaba.rocketmq.common.message.MessageExt;/*** 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)*/

public classConsumer1 {public static void main(String[] args) throwsMQClientException {

DefaultMQPushConsumer consumer= new DefaultMQPushConsumer("order_Consumer");

consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费

* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicOrderTest", "*");

consumer.registerMessageListener(newMessageListenerOrderly() {

AtomicLong consumeTimes= new AtomicLong(0);public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) {//设置自动提交

context.setAutoCommit(true);for(MessageExt msg : msgs) {

System.out.println(msg+ ",内容:" + newString(msg.getBody()));

}try{

TimeUnit.SECONDS.sleep(5L);

}catch(InterruptedException e) {

e.printStackTrace();

}

;returnConsumeOrderlyStatus.SUCCESS;

}

});

consumer.start();

System.out.println("Consumer1 Started.");

}

}

结果如下图所示:

b3e43a6a2d56708930f3977a9b8ada6c.png

这个五条数据被顺序消费了

多个节点(Producer端1个、Consumer端2个)

Producer.java

packageorder;importjava.util.List;importcom.alibaba.rocketmq.client.exception.MQBrokerException;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.client.producer.DefaultMQProducer;importcom.alibaba.rocketmq.client.producer.MessageQueueSelector;importcom.alibaba.rocketmq.client.producer.SendResult;importcom.alibaba.rocketmq.common.message.Message;importcom.alibaba.rocketmq.common.message.MessageQueue;importcom.alibaba.rocketmq.remoting.exception.RemotingException;/*** Producer,发送顺序消息*/

public classProducer {public static voidmain(String[] args) {try{

DefaultMQProducer producer= new DefaultMQProducer("order_Producer");

producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");

producer.start();//String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",//"TagE" };

for (int i = 1; i <= 5; i++) {

Message msg= new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " +i).getBytes());

SendResult sendResult= producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {

Integer id=(Integer) arg;int index = id %mqs.size();returnmqs.get(index);

}

},0);

System.out.println(sendResult);

}for (int i = 1; i <= 5; i++) {

Message msg= new Message("TopicOrderTest", "order_2", "KEY" + i, ("order_2 " +i).getBytes());

SendResult sendResult= producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {

Integer id=(Integer) arg;int index = id %mqs.size();returnmqs.get(index);

}

},1);

System.out.println(sendResult);

}for (int i = 1; i <= 5; i++) {

Message msg= new Message("TopicOrderTest", "order_3", "KEY" + i, ("order_3 " +i).getBytes());

SendResult sendResult= producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {

Integer id=(Integer) arg;int index = id %mqs.size();returnmqs.get(index);

}

},2);

System.out.println(sendResult);

}

producer.shutdown();

}catch(MQClientException e) {

e.printStackTrace();

}catch(RemotingException e) {

e.printStackTrace();

}catch(MQBrokerException e) {

e.printStackTrace();

}catch(InterruptedException e) {

e.printStackTrace();

}

}

}

Consumer1.java

/*** 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)*/

public classConsumer1 {public static void main(String[] args) throwsMQClientException {

DefaultMQPushConsumer consumer= new DefaultMQPushConsumer("order_Consumer");

consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费

* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicOrderTest", "*");/*** 实现了MessageListenerOrderly表示一个队列只会被一个线程取到

*,第二个线程无法访问这个队列*/consumer.registerMessageListener(newMessageListenerOrderly() {

AtomicLong consumeTimes= new AtomicLong(0);public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) {//设置自动提交

context.setAutoCommit(true);for(MessageExt msg : msgs) {

System.out.println(msg+ ",内容:" + newString(msg.getBody()));

}try{

TimeUnit.SECONDS.sleep(5L);

}catch(InterruptedException e) {

e.printStackTrace();

}

;returnConsumeOrderlyStatus.SUCCESS;

}

});

consumer.start();

System.out.println("Consumer1 Started.");

}

}

Consumer2.java

/*** 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)*/

public classConsumer2 {public static void main(String[] args) throwsMQClientException {

DefaultMQPushConsumer consumer= new DefaultMQPushConsumer("order_Consumer");

consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费

* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicOrderTest", "*");/*** 实现了MessageListenerOrderly表示一个队列只会被一个线程取到

*,第二个线程无法访问这个队列*/consumer.registerMessageListener(newMessageListenerOrderly() {

AtomicLong consumeTimes= new AtomicLong(0);public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) {//设置自动提交

context.setAutoCommit(true);for(MessageExt msg : msgs) {

System.out.println(msg+ ",内容:" + newString(msg.getBody()));

}try{

TimeUnit.SECONDS.sleep(5L);

}catch(InterruptedException e) {

e.printStackTrace();

}

;returnConsumeOrderlyStatus.SUCCESS;

}

});

consumer.start();

System.out.println("Consumer2 Started.");

}

}

先启动Consumer1和Consumer2,然后启动Producer,Producer会发送15条消息

Consumer1消费情况如图,都按照顺序执行了

4ffbd4eb8d4e6a82eb3be8c551f0738b.png

Consumer2消费情况如图,都按照顺序执行了

36d754b5c5172483626d8b22900d8a12.png

二、事务消费

这里说的主要是分布式事物。下面的例子的数据库分别安装在不同的节点上。

事物消费需要先说说什么是事务。比如说:我们跨行转账,从工商银行转到建设银行,也就是我从工商银行扣除1000元之后,我的建设银行也必须加1000元。这样才能保证数据的一致性。假如工商银行转1000元之后,建设银行的服务器突然宕机,那么我扣除了1000,但是并没有在建设银行给我加1000,就出现了数据的不一致。因此加1000和减1000才行,减1000和减1000必须一起成功,一起失败。

再比如,我们进行网购的时候,我们下单之后,订单提交成功,仓库商品的数量必须减一。但是订单可能是一个数据库,仓库数量可能又是在另个数据库里面。有可能订单提交成功之后,仓库数量服务器突然宕机。这样也出现了数据不一致的问题。

使用消息队列来解决分布式事物:

现在我们去外面饭店吃饭,很多时候都不会直接给了钱之后直接在付款的窗口递饭菜,而是付款之后他会给你一张小票,你拿着这个小票去出饭的窗口取饭。这里和我们的系统类似,提高了吞吐量。即使你到第二个窗口,师傅告诉你已经没饭了,你可以拿着这个凭证去退款,即使中途由于出了意外你无法到达窗口进行取饭,但是只要凭证还在,可以将钱退给你。这样就保证了数据的一致性。

如何保证凭证(消息)有2种方法:

1、在工商银行扣款的时候,余额表扣除1000,同时记录日志,而且这2个表是在同一个数据库实例中,可以使用本地事物解决。然后我们通知建设银行需要加1000给该用户,建设银行收到之后给我返回已经加了1000给用户的确认信息之后,我再标记日志表里面的日志为已经完成。

2、通过消息中间件

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/265614.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

列出我所知道的图像处理库

&#xff11;&#xff0e;对OpenCV的印象&#xff1a;功能十分的强大&#xff0c;而且支持目前先进的图像处理技术&#xff0c;体系十分完善&#xff0c;操作手册很详细&#xff0c;手册首先给大家补计算机视觉的知识&#xff0c;几乎涵盖了近10年内的主流算法&#xff1b;然后…

GPIO输入输出模式原理(八种工作方式附电路图详解)

这几篇博文讲的不错&#xff0c;可参照着理解&#xff1a; STM32下拉输入模式与振动传感器的使用 “上拉电阻与下拉电阻”通俗解读 上、下拉电阻&#xff08;定义、强弱上拉、常见作用、吸电流、拉电流、灌电流&#xff09; 个人总结&#xff1a; 模拟量选浮空输入&#xff0c…

webpack打包后引用cdn的js_利用CDN加速react webpack打包后的文件详解

此文不介绍webpack基本配置&#xff0c;如果对基本配置有疑问请查阅官方文档。1、配置webpack.config.js将output.publicPath改成上传到的cdn地址, 例(对应上面上传配置)&#xff1a;publicPath: "https://your_base_cdn_url" process.env.NODE_ENV "/cdn/&qu…

STM32F103构建固件库模板(PS固件库文件树介绍)

参考&#xff1a;STM32F103ZE新建固件库模板 作者&#xff1a;追兮兮 发布时间&#xff1a;2020-10-14 10:31:45 网址&#xff1a;https://blog.csdn.net/weixin_44234294/article/details/109065495 参考博文&#xff1a;https://blog.csdn.net/visual_eagle/article/details/…

java ee 中文乱码的问题

java ee 中文乱码的问题发生中文乱码的三种情况(一) 表单formPost 方法直接在服务器中设置request.setCharacterEncoding("utf-8");get方法 自己转码,下面是方法.public class HuanMa{public static String getUTF8(String str){String s"";try {s new St…

深度相机 物体三维重建_基于深度相机进行室内完整场景三维重建的方法及系统_2017100513665_说明书_专利查询_专利网_钻瓜专利网...

S121&#xff1a;采用Kintinuous框架&#xff0c;进行视觉里程计估计&#xff0c;得到每帧深度图像下的相机位姿信息。S122&#xff1a;根据相机位姿信息&#xff0c;将由每帧深度图像对应的点云数据反投影到初始坐标系下&#xff0c;用投影后得到的深度图像与初始帧的深度图像…

STM32F103实现点灯(固件库方式)

参考&#xff1a;stm32上实现点灯 作者&#xff1a;SKY丶丿平才 发布时间&#xff1a; 2021-03-20 16:51:06 网址&#xff1a;https://blog.csdn.net/weixin_48264057/article/details/115028724 目录前言一、硬件设计二、软件设计1.建立工程2.代码编写三、实际验证1.烧写程序2…

strtus2.3 java.lang.NoSuchFieldException: DEFAULT_PARAM

strtus2.3.15.1 的bug请下载 http://download.csdn.net/detail/livalue/6229373 或加群到群共享中下载.214579879

STM32震动感应控制继电器(使用循环VS使用外部中断EXTI和中断控制器NVIC)

参考&#xff1a;stm32的外部中断 震动感应 控制 继电器 作者&#xff1a;点灯小哥 发布时间&#xff1a; 2021-03-05 22:37:01 网址&#xff1a;https://blog.csdn.net/weixin_46016743/article/details/114417161 参考&#xff1a;STM32震动感应灯 作者&#xff1a;一只小阿大…

mysql 查找课程最高分_mysql 查询 学生id最高分的科目和日期

mysql>select*fromstudent;------------------------------|id|subject|mark|date|------------------------------|24|语文|56|2004-11||32|数学|74|2006-5||24|政治|100|2...mysql> select * from student;------------------------------| id | subject | mark | date…

我的lisp启程

Lisp语言仰慕已久&#xff0c;尤其是其作为人工智能的编程语言&#xff0c;感觉明显比其他语言高端不少&#xff0c;加之同班的同学也要自学lisp&#xff0c;就下定决心趁着在校的时候将lisp掌握。大一的时候接触python&#xff0c;觉得python语言特别难学&#xff0c;因为从来…

结构体与共用体(联合体)

参考&#xff1a;结构体与共用体 作者&#xff1a;一只青木呀 发布时间&#xff1a; 2020-08-09 08:29:22 网址&#xff1a;https://blog.csdn.net/weixin_45309916/article/details/107889394 参考&#xff1a;联合体&#xff08;union&#xff09;的使用方法及其本质 作者&am…

mysql 5.6.10 32_安装mysql-5.6.10-win32 解压版-略有修改

1.复制mysql-5.6.10-win32.zip到D:\app目录下.2.解压mysql-5.6.10-win32.zip3.在D:\app\mysql-5.6.10-win32下&#xff0c;复制“my-default.ini”文件&#xff0c;生成“复件 my-default.ini”文件。将“复件 my-default.ini”文件重命名成“my.ini” 。(注意&#xff1a;如果…

培养自信,笑赢未来

注&#xff1a;本文是儿子幼儿园小班第一个月中&#xff0c;老师要求每个学生家长都要交的命题作文。要求基于当前典型的421家庭中&#xff0c;如何培养孩子的自信心成文。为此&#xff0c;我作为家长第一次向老师提交了这篇作文。自信心对一个人的健康成长至关重要&#xff0c…

mysql中当前时间九点_MySQL 获得当前日期时间(以及时间的转换)

1.1 获得当前日期时间(date time)函数&#xff1a;now()除了 now() 函数能获得当前的日期时间外&#xff0c;MySQL 中还有下面的函数&#xff1a;current_timestamp() current_timestamplocaltime() localtimelocaltimestamp() localtimestamp这些日期时间函数&#xff0c;都等…

STM32串口原理、结构体、库函数、串口发送字符(串)、重定向printf串口发送、串口中断接收控制灯、接收不定长数据

参考&#xff1a;串口的结构体 重定向printf串口发送stm32等博文 作者&#xff1a;点灯小哥 发布时间&#xff1a; 2021-03-06 21:46:33 网址&#xff1a;https://blog.csdn.net/weixin_46016743/article/details/114458698 目录串口相关知识定义通信概念1.通讯结构2.电平标准3…

mysql kvdb_从MYSQL到KVDB

2016年4月10日 星期日 阴&#xff0c;雾霾惊悉新浪SAE又多收费了&#xff0c;凡是用Mysql的应用&#xff0c;每天至少要交22448颗云豆的租金。新浪为什么经常干这种事呢&#xff1f;用户会不会怀疑哪天又开始另找名目多收云豆&#xff1f;这跟食堂打菜员给人打菜一样&#xff0…

STM32通用定时器(原理、结构体、库函数、定时器中断每秒闪烁一次灯) —— 时钟源、分频值、重装载值

参考&#xff1a;stm32定时器与定时器中断 作者&#xff1a;打酱油的 发布时间&#xff1a; 2021-04-11 01:04:09 网址&#xff1a;https://blog.csdn.net/weixin_46098612/article/details/115493737 参考&#xff1a;stm32通用定时器结构体函数定时器实现led闪烁 作者&#x…

关于错误error C4430 error C2365 error C2078 error C2440 error C2143的处理。

关于错误error C4430 error C2365 error C2078 error C2440 error C2143的处理。 楼主在公司接手了一段代码。编译的时候发现一个很奇怪的问题&#xff0c;就是获取最新代码的时候能编译通过&#xff0c;然后下次就编译不通过了&#xff0c;提示如下错误。 后来发现问题出在std…

返回json数据

拦截器的配置&#xff0c;导致只能返回boolean类型的数据&#xff0c;那么要让前端知道在哪里被拦截了&#xff0c;拦截了什么&#xff0c;这就要用到json数据&#xff0c;返回相应的数据,package com.sysh.web.interceptor;/** * Created by sjy Cotter on 2018/7/24. */impor…