RocketMQ—RocketMQ发送同步、异步、单向、延迟、批量、顺序、批量消息、带标签消息

RocketMQ—RocketMQ发送同步、异步、单向、延迟、批量、顺序、批量消息、带标签消息

发送同步消息

同步消息

生产者发送消息,mq进行确认,然后返回给生产者状态。这就是同步消息。

前文demo程序就是发送的同步消息。

发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。发送完以后会有一个异步消息通知。

代码如下:

/*** 异步消息测试*/
@Test
public void simpleAsyncProducer() throws Exception {//创建一个生产者,并指定一个组名DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");//连接namesrv,参数是namesrv的ip地址:端口号producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);//启动producer.start();//指定topic,创建一个消息Message message = new Message("asyncTopic1", "这是一条异步消息".getBytes());//发送异步消息,并设置回调内容producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("回调内容,发送成功");}@Overridepublic void onException(Throwable throwable) {log.info("回调内容,发送失败");}});log.info("主线程执行中=========");System.in.read();
}

运行结果

从运行结果可以看到是不同的线程输出的内容。

发送单向消息

这种方式主要用在不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,例如日志信息的发送。

代码如下:

@Test
public void oneWayMessageTest() throws Exception {//创建一个生产者,并指定一个组名DefaultMQProducer producer = new DefaultMQProducer("oneway-producer-group");//连接namesrv,参数是namesrv的ip地址:端口号producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);//启动producer.start();//指定topic,创建一个消息Message message = new Message("onewayTopic1", "这是一条单向消息".getBytes());//发送单向消息producer.sendOneway(message);producer.shutdown();
}

发送延迟消息

消息放入mq后,过一段时间,才会被监听到,然后消费.

比如下订单业务,提交了一个订单就可以发送一个延时消息,30min后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

代码如下

@Test
public void msMessageTest() throws Exception{//创建一个生产者,并指定一个组名DefaultMQProducer producer = new DefaultMQProducer("ms-producer-group");//连接namesrv,参数是namesrv的ip地址:端口号producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);//启动producer.start();//指定topic,创建一个消息Message message = new Message("msTopic1", "这是一条单向消息".getBytes());//给消息设置一个延迟时间message.setDelayTimeLevel(3);//发送延时消息producer.sendOneway(message);producer.shutdown();
}

延时等级如下:

消息延时等级

发送批量消息

代码如下:

@Test
public void testBatchProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer = new DefaultMQProducer("test-batch-group");//连接namesrv,参数是namesrv的ip地址:端口号producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);// 启动实例producer.start();List<Message> msgs = Arrays.asList(new Message("batchTopicTest", "我是一组消息的A消息".getBytes()),new Message("batchTopicTest", "我是一组消息的B消息".getBytes()),new Message("batchTopicTest", "我是一组消息的C消息".getBytes()));SendResult send = producer.send(msgs);System.out.println(send);// 关闭实例producer.shutdown();
}

这些消息会被放到同一个队列中。

发送顺序消息

可以想象一个场景,我们在网上购物时,需要先完成下订单操作,然后再去发短信,再进行发货,需要保证顺序的。

前文我们讲的都是并发消息,这种消息并不能完成上述的场景逻辑。比如一个topic里有10个消息,分别在4个队列中;

  • 如果消费者,同时有20个线程在消费,可能A线程拿到消息1了,B线程拿到消息2了,但是B线程可能完成的比A线程早,这就没办法上述场景的顺序了。
  • 如果消费者只有一个线程,轮询消费四个队列中的消息时,也不能保证是网购场景中的顺序的。

这就要引出顺序消息:把消费者变为单线程,把下订单消息、发短信消息、发货消息放到同一个队列就可以了。

代码

消息封装成实体类如下:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageModel {//订单idprivate String orderId;//用户idprivate String userId;//消息描述private String description;
}

发送顺序消息的生产者代码如下:

/*** 顺序消息*/@Test
public void testOrderlyProducer() throws Exception {List<MessageModel> messageModelList = Arrays.asList(//用户1的订单new MessageModel("order-111","user-1","下单"),new MessageModel("order-111","user-1","发短信"),new MessageModel("order-111","user-1","发货"),//用户2的订单new MessageModel("order-222","user-2","下单"),new MessageModel("order-222","user-2","发短信"),new MessageModel("order-222","user-2","发货"));// 创建默认的生产者DefaultMQProducer producer = new DefaultMQProducer("test-orderly-group");//连接namesrv,参数是namesrv的ip地址:端口号producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);// 启动实例producer.start();//发送顺序消息时 发送时相同用户的消息要保证有序,并且发到同一个队列里messageModelList.forEach(messageModel->{Message message = new Message("orderlyTopic", messageModel.toString().getBytes());try {//发送消息,相同订单号去相同队列producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message message, Object arg) {//producer.send(message,selector,arg),第三个参数订单号会传给selector要实现的方法的arg//在这里选择队列int hashCode = Math.abs(arg.toString().hashCode());int index = hashCode % mqs.size();return mqs.get(index);}}, messageModel.getOrderId());} catch (Exception e) {log.error("有错误发生",e);}});// 关闭实例producer.shutdown();log.info("发送完成");
}

消费顺序消息的消费者代码如下:

//消费者
@Test
public void orderlyConsumer() throws Exception {//创建一个消费者,并指定一个组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-orderly-consumer-group");//连接namesrv,参数是namesrv的ip地址:端口号consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);//订阅一个主题 *号表示订阅这个主题中所有的消息consumer.subscribe("orderlyTopic","*");//设置一个监听器(一直监听,异步回调方式)consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {log.info("线程id"+Thread.currentThread().getId());log.info("消息内容:"+new String(msgs.get(0).getBody()));return ConsumeOrderlyStatus.SUCCESS;}});//启动消费者consumer.start();//挂起当前jvm,防止主线程结束,让监听器一直监听System.in.read();}

运行结果如下:

运行结果

可以看到同一个订单是顺序消费的。

其他问题

如果我们的消息消费失败了怎么办?

如果是并发模式,消费失败会进行重试,重试16次后还会没消费成功,会被放到死信队列里。

如果是顺序模式,如果重试失败,会无限重试,是int的最大值。

发送带标签的消息,消息过滤

如果我们有衣服订单的消息、手机订单的消息,如果我们只使用topic进行区分,就要使用两个topic;但是它们都是订单,所以在同一个topic中会好一些,Rocketmq就提供了消息过滤功能,通过tag或者key进行区分。

生产者代码如下:

@Test
public void testTagProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer = new DefaultMQProducer("test-tag-group");//连接namesrv,参数是namesrv的ip地址:端口号producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);// 启动实例producer.start();Message messageTopic1 = new Message("tagTopic", "tag1", "这是topic1的消息".getBytes());Message messageTopic2 = new Message("tagTopic", "tag2", "这是topic2的消息".getBytes());producer.send(messageTopic1);producer.send(messageTopic2);// 关闭实例producer.shutdown();
}

消费tag1的消费者

//消费tag1的消费者
@Test
public void tagConsumer1() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);consumer.subscribe("tagTopic", "tag1");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println("我是tag1的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}

消费tag1和tag2的消费者

//消费tag1和tag2的消费者
@Test
public void tagConsumer2() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);consumer.subscribe("tagTopic", "tag1 || tag2");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println("我是tag1和tag2的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}

带key的消息

消息都会有自己的MessageId的,如下图:

messageid

那我们能否指定id呢?

在发送消息时可以指定key:

@Test
public void testKeyProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer = new DefaultMQProducer("test-key-group");//连接namesrv,参数是namesrv的ip地址:端口号producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);String key = UUID.randomUUID().toString();// 启动实例producer.start();Message messageTopic1 = new Message("keyTopic", "tag1",key, "这是topic1的消息".getBytes());producer.send(messageTopic1);// 关闭实例producer.shutdown();
}

消费者获取key:

@Test
public void testKeyConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("key-consumer-group-a");consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);consumer.subscribe("keyTopic","*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println("我们设置的key:" + msgs.get(0).getKeys());return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}

输出如下:

输出

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/665937.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

gorm day1

gorm day1 gorm简介gorm声明模型 代码样例基本来自官方文档 Gorm简介 什么是ORM&#xff1f; 对象关系映射(Objection Relational Mapping,简称ORM)模式是一种为了解决面向对象与关系数据库(如mysql数据库&#xff09;存在的互不匹配现象的计数。简单来说&#xff0c;ORM是通…

计算机毕设医院挂号预约系统ssm

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; vue mybatis Maven mysql5.7或8.0等等组成&#xff0c;B…

【Redis】整理

对于现代大型系统而言&#xff0c;缓存是一个绕不开的技术话题&#xff0c;一提到缓存我们很容易想到Redis。 Redis整理&#xff0c;供回顾参考

单片机学习笔记---定时器/计数器(简述版!)

目录 定时器的介绍 定时计数器的定时原理 定时计数器的内部结构 两种控制寄存器 &#xff08;1&#xff09;工作方式寄存器TMOD &#xff08;2&#xff09;控制寄存器TCON 定时计数器的工作方式 方式0 方式1 方式2 方式3 定时器的配置步骤 第一步&#xff0c;对…

《幻兽帕鲁》好玩吗?幻兽帕鲁能在Mac上运行吗?

最近一款叫做《幻兽帕鲁》的新游戏走红&#xff0c;成为了Steam游戏平台上&#xff0c;连续3周的销量冠军&#xff0c;有不少Mac电脑用户&#xff0c;利用Crossover成功玩上了《幻兽帕鲁》&#xff0c;其实Crossover已经支持很多3A游戏&#xff0c;包括《赛博朋克2077》《博德之…

Nicn的刷题日常之字符串左旋(详细图解思路,多解法,建议三连收藏)

目录 1.题目描述 一 2.解题想法图解 2.1直接解 2.2巧解 3.题目描述二 3.1.1思路1 3.1.2 思路2 4.结语 1.题目描述 一 实现现一个函数&#xff0c;可以左旋字符串中的k个字符。 例如&#xff1a; ABCD左旋一个字符得到BCDA ABCD左旋两个字符得到CDAB 2.解题想法图解 2.…

使用wda框架实现IOS自动化测试详解

目录 1、weditor元素定位工具 1.1、weditor的安装和使用 2、wda iOS自动化框架 2.1、wda概述 2.2、wda安装 2.3、wda的使用 2.3.1、全局配置 2.3.2、创建客户端 2.3.3、APP相关操作 1、启动APP 2、关闭APP 3、获取APP状态信息 4、获取当前APP的运行信息 2.3.4、设…

【leetcode题解C++】98.验证二叉搜索树 and 701.二叉搜索树中的插入操作

98. 验证二叉搜索树 给你一个二叉树的根节点 root &#xff0c;判断其是否是一个有效的二叉搜索树。 有效 二叉搜索树定义如下&#xff1a; 节点的左子树只包含 小于 当前节点的数。节点的右子树只包含 大于 当前节点的数。所有左子树和右子树自身必须也是二叉搜索树。 示例…

MYSQL——MySQL8.3无法启动

在新电脑上装了个MySQL&#xff0c;但是无法使用net start mysql启动&#xff0c;很是纳闷&#xff0c;使用mysqld --console去查看报错&#xff0c;也是没报错的&#xff0c;但是奇怪的是&#xff0c;我输入完这个mysqld --console之后&#xff0c;就等于启动了mysql了&#x…

[python]基于opencv实现的车道线检测

【检测原理】 一、首先进行canny边缘检测&#xff0c;为获取车道线边缘做准备 二、进行ROI提取获取确切的车道线边缘&#xff08;红色线内部&#xff09; 三、利用概率霍夫变换获取直线&#xff0c;并将斜率正数和复数的线段给分割开来 四、离群值过滤&#xff0c;剔除斜率…

大数据平台-可视化面板介绍-Echarts

应对现在数据可视化的趋势&#xff0c;越来越多企业需要在很多场景(营销数据&#xff0c;生产数据&#xff0c;用户数据)下使用&#xff0c;可视化图表来展示体现数据&#xff0c;让数据更加直观&#xff0c;数据特点更加突出。 目录 01-使用技术 02- 案例适配方案 03-基础…

【大厂AI课学习笔记】1.4 算法的进步(4)关于李飞飞团队的ImageNet

第一个图像数据库是ImageNet&#xff0c;由斯坦福大学的计算机科学家李飞飞推出。ImageNet是一个大型的可视化数据库&#xff0c;旨在推动计算机视觉领域的研究。这个数据库包含了数以百万计的手工标记的图像&#xff0c;涵盖了数千个不同的类别。 基于ImageNet数据库&#xf…

Android之命令行烧写OTA镜像(一百八十五)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…

【Leetcode】1690. 石子游戏 VII

文章目录 题目思路代码结果 题目 题目链接 石子游戏中&#xff0c;爱丽丝和鲍勃轮流进行自己的回合&#xff0c;爱丽丝先开始 。 有 n 块石子排成一排。每个玩家的回合中&#xff0c;可以从行中 移除 最左边的石头或最右边的石头&#xff0c;并获得与该行中剩余石头值之 和 相…

Vue基础知识七

一 路由 1.1 生活里的路由与路由器 是为了实现多台设备上网 1.2 程序里的路由与路由器 是为了实现导航区与展示区来回切换&#xff1b; SPA单页面应用&#xff1a;就像前几章节里的项目&#xff0c;整个项目只有一个html文件&#xff1b; 案例 注意&#xff0c;最开始的时候…

嵌入式学习 Day18

Linux软件编程: 1.Linux: 操作系统的内核 1.管理CPU 2.管理内存 3.管理硬件设备 4.管理文件系统 5.任务调度 2.Shell&#xff1a; 1.保护Linux内核(用户和Linux内核不直接操作,通过操作Shell,Shell和内核交互) 2.命令解释器 3…

STM32--SPI通信协议(2)W25Q64简介

一、W25Q64简介 1、W25Qxx中的xx是不同的数字&#xff0c;表示了这个芯片不同的存储容量&#xff1b; 2、存储器分为易失性与非易失性&#xff0c;主要区别是存储的数据是否是掉电不丢失&#xff1a; 易失性存储器&#xff1a;SRAM、DRAM&#xff1b; 非易失性存储器&#xff…

红队渗透靶机:LORD OF THE ROOT: 1.0.1

目录 信息收集 1、arp 2、nmap 3、knock 4、nikto 目录探测 1、gobuster 2、dirsearch WEB sqlmap 爆库 爆表 爆列 爆字段 hydra爆破 ssh登录 提权 信息收集 内核提权 信息收集 1、arp ┌──(root㉿ru)-[~/kali] └─# arp-scan -l Interface: eth0, ty…

参考数据集INRIA Holidays dataset

Download datasets 很贴心,MATLAB访问代码: % This function reads a siftgeo binary file % % Usage: [v, meta] = siftgeo_read (filename, maxdes) % filename the input filename % maxdes maximum number of descriptors to be loaded % (default=unlimit…

【微服务】Spring Boot集成ELK实用案例

推荐一款我一直在用国内很火的AI网站&#xff0c;包含GPT3.5/4.0、文心一言、通义千问、智谱AI等多个AI模型&#xff0c;支持PC、APP、VScode插件同步使用&#xff0c;点击链接跳转->ChatGPT4.0中文版 一、前言 在现代软件开发中&#xff0c;微服务架构已成为一种流行趋势。…