RocketMQ—RocketMQ发送同步、异步、单向、延迟、批量、顺序、批量消息、带标签消息

RocketMQ—RocketMQ发送同步、异步、单向、延迟、批量、顺序、批量消息、带标签消息

发送同步消息

同步消息

生产者发送消息,mq进行确认,然后返回给生产者状态。这就是同步消息。

前文demo程序就是发送的同步消息。

发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。发送完以后会有一个异步消息通知。

代码如下:

/*** 异步消息测试*/
@Test
public void simpleAsyncProducer() throws Exception {//创建一个生产者,并指定一个组名DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");//连接namesrv,参数是namesrv的ip地址:端口号producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);//启动producer.start();//指定topic,创建一个消息Message message = new Message("asyncTopic1", "这是一条异步消息".getBytes());//发送异步消息,并设置回调内容producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("回调内容,发送成功");}@Overridepublic void onException(Throwable throwable) {log.info("回调内容,发送失败");}});log.info("主线程执行中=========");System.in.read();
}

运行结果

从运行结果可以看到是不同的线程输出的内容。

发送单向消息

这种方式主要用在不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,例如日志信息的发送。

代码如下:

@Test
public void oneWayMessageTest() throws Exception {//创建一个生产者,并指定一个组名DefaultMQProducer producer = new DefaultMQProducer("oneway-producer-group");//连接namesrv,参数是namesrv的ip地址:端口号producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);//启动producer.start();//指定topic,创建一个消息Message message = new Message("onewayTopic1", "这是一条单向消息".getBytes());//发送单向消息producer.sendOneway(message);producer.shutdown();
}

发送延迟消息

消息放入mq后,过一段时间,才会被监听到,然后消费.

比如下订单业务,提交了一个订单就可以发送一个延时消息,30min后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

代码如下

@Test
public void msMessageTest() throws Exception{//创建一个生产者,并指定一个组名DefaultMQProducer producer = new DefaultMQProducer("ms-producer-group");//连接namesrv,参数是namesrv的ip地址:端口号producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);//启动producer.start();//指定topic,创建一个消息Message message = new Message("msTopic1", "这是一条单向消息".getBytes());//给消息设置一个延迟时间message.setDelayTimeLevel(3);//发送延时消息producer.sendOneway(message);producer.shutdown();
}

延时等级如下:

消息延时等级

发送批量消息

代码如下:

@Test
public void testBatchProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer = new DefaultMQProducer("test-batch-group");//连接namesrv,参数是namesrv的ip地址:端口号producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);// 启动实例producer.start();List<Message> msgs = Arrays.asList(new Message("batchTopicTest", "我是一组消息的A消息".getBytes()),new Message("batchTopicTest", "我是一组消息的B消息".getBytes()),new Message("batchTopicTest", "我是一组消息的C消息".getBytes()));SendResult send = producer.send(msgs);System.out.println(send);// 关闭实例producer.shutdown();
}

这些消息会被放到同一个队列中。

发送顺序消息

可以想象一个场景,我们在网上购物时,需要先完成下订单操作,然后再去发短信,再进行发货,需要保证顺序的。

前文我们讲的都是并发消息,这种消息并不能完成上述的场景逻辑。比如一个topic里有10个消息,分别在4个队列中;

  • 如果消费者,同时有20个线程在消费,可能A线程拿到消息1了,B线程拿到消息2了,但是B线程可能完成的比A线程早,这就没办法上述场景的顺序了。
  • 如果消费者只有一个线程,轮询消费四个队列中的消息时,也不能保证是网购场景中的顺序的。

这就要引出顺序消息:把消费者变为单线程,把下订单消息、发短信消息、发货消息放到同一个队列就可以了。

代码

消息封装成实体类如下:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageModel {//订单idprivate String orderId;//用户idprivate String userId;//消息描述private String description;
}

发送顺序消息的生产者代码如下:

/*** 顺序消息*/@Test
public void testOrderlyProducer() throws Exception {List<MessageModel> messageModelList = Arrays.asList(//用户1的订单new MessageModel("order-111","user-1","下单"),new MessageModel("order-111","user-1","发短信"),new MessageModel("order-111","user-1","发货"),//用户2的订单new MessageModel("order-222","user-2","下单"),new MessageModel("order-222","user-2","发短信"),new MessageModel("order-222","user-2","发货"));// 创建默认的生产者DefaultMQProducer producer = new DefaultMQProducer("test-orderly-group");//连接namesrv,参数是namesrv的ip地址:端口号producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);// 启动实例producer.start();//发送顺序消息时 发送时相同用户的消息要保证有序,并且发到同一个队列里messageModelList.forEach(messageModel->{Message message = new Message("orderlyTopic", messageModel.toString().getBytes());try {//发送消息,相同订单号去相同队列producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message message, Object arg) {//producer.send(message,selector,arg),第三个参数订单号会传给selector要实现的方法的arg//在这里选择队列int hashCode = Math.abs(arg.toString().hashCode());int index = hashCode % mqs.size();return mqs.get(index);}}, messageModel.getOrderId());} catch (Exception e) {log.error("有错误发生",e);}});// 关闭实例producer.shutdown();log.info("发送完成");
}

消费顺序消息的消费者代码如下:

//消费者
@Test
public void orderlyConsumer() throws Exception {//创建一个消费者,并指定一个组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-orderly-consumer-group");//连接namesrv,参数是namesrv的ip地址:端口号consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);//订阅一个主题 *号表示订阅这个主题中所有的消息consumer.subscribe("orderlyTopic","*");//设置一个监听器(一直监听,异步回调方式)consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {log.info("线程id"+Thread.currentThread().getId());log.info("消息内容:"+new String(msgs.get(0).getBody()));return ConsumeOrderlyStatus.SUCCESS;}});//启动消费者consumer.start();//挂起当前jvm,防止主线程结束,让监听器一直监听System.in.read();}

运行结果如下:

运行结果

可以看到同一个订单是顺序消费的。

其他问题

如果我们的消息消费失败了怎么办?

如果是并发模式,消费失败会进行重试,重试16次后还会没消费成功,会被放到死信队列里。

如果是顺序模式,如果重试失败,会无限重试,是int的最大值。

发送带标签的消息,消息过滤

如果我们有衣服订单的消息、手机订单的消息,如果我们只使用topic进行区分,就要使用两个topic;但是它们都是订单,所以在同一个topic中会好一些,Rocketmq就提供了消息过滤功能,通过tag或者key进行区分。

生产者代码如下:

@Test
public void testTagProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer = new DefaultMQProducer("test-tag-group");//连接namesrv,参数是namesrv的ip地址:端口号producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);// 启动实例producer.start();Message messageTopic1 = new Message("tagTopic", "tag1", "这是topic1的消息".getBytes());Message messageTopic2 = new Message("tagTopic", "tag2", "这是topic2的消息".getBytes());producer.send(messageTopic1);producer.send(messageTopic2);// 关闭实例producer.shutdown();
}

消费tag1的消费者

//消费tag1的消费者
@Test
public void tagConsumer1() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);consumer.subscribe("tagTopic", "tag1");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println("我是tag1的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}

消费tag1和tag2的消费者

//消费tag1和tag2的消费者
@Test
public void tagConsumer2() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);consumer.subscribe("tagTopic", "tag1 || tag2");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println("我是tag1和tag2的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}

带key的消息

消息都会有自己的MessageId的,如下图:

messageid

那我们能否指定id呢?

在发送消息时可以指定key:

@Test
public void testKeyProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer = new DefaultMQProducer("test-key-group");//连接namesrv,参数是namesrv的ip地址:端口号producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);String key = UUID.randomUUID().toString();// 启动实例producer.start();Message messageTopic1 = new Message("keyTopic", "tag1",key, "这是topic1的消息".getBytes());producer.send(messageTopic1);// 关闭实例producer.shutdown();
}

消费者获取key:

@Test
public void testKeyConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("key-consumer-group-a");consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);consumer.subscribe("keyTopic","*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println("我们设置的key:" + msgs.get(0).getKeys());return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}

输出如下:

输出

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/665937.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

gorm day1

gorm day1 gorm简介gorm声明模型 代码样例基本来自官方文档 Gorm简介 什么是ORM&#xff1f; 对象关系映射(Objection Relational Mapping,简称ORM)模式是一种为了解决面向对象与关系数据库(如mysql数据库&#xff09;存在的互不匹配现象的计数。简单来说&#xff0c;ORM是通…

计算机毕设医院挂号预约系统ssm

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; vue mybatis Maven mysql5.7或8.0等等组成&#xff0c;B…

定时删除指定文件夹及子文件夹 确保硬盘不会被占满 bat脚本

如果你想要一个批处理脚本&#xff0c;该脚本可以在定时删除指定文件夹及其子文件夹的同时确保硬盘不会被占满&#xff0c;你可以使用以下脚本&#xff1a; echo off set "target_folderC:\path\to\folder" set "days_to_keep7" set "max_space_to_us…

【Redis】整理

对于现代大型系统而言&#xff0c;缓存是一个绕不开的技术话题&#xff0c;一提到缓存我们很容易想到Redis。 Redis整理&#xff0c;供回顾参考

解释 Python 中的描述符(Descriptor)是什么?如何在 Python 中实现一个简单的 ORM(对象关系映射)?

解释 Python 中的描述符&#xff08;Descriptor&#xff09;是什么&#xff1f;举例说明其用法。 在 Python 中&#xff0c;描述符&#xff08;Descriptor&#xff09;是一种对象属性的扩展机制&#xff0c;它允许你在访问或修改属性时执行自定义的操作。描述符是实现了特定协…

单片机学习笔记---定时器/计数器(简述版!)

目录 定时器的介绍 定时计数器的定时原理 定时计数器的内部结构 两种控制寄存器 &#xff08;1&#xff09;工作方式寄存器TMOD &#xff08;2&#xff09;控制寄存器TCON 定时计数器的工作方式 方式0 方式1 方式2 方式3 定时器的配置步骤 第一步&#xff0c;对…

MATLAB算法实战应用案例精讲-【人工智能】基于机器视觉的机器人及机械臂运动规划(最终篇)

目录 前言 几个高频面试题目 机械臂智能抓取涉及什么技术? 算法原理 智能抓取系统构成

《幻兽帕鲁》好玩吗?幻兽帕鲁能在Mac上运行吗?

最近一款叫做《幻兽帕鲁》的新游戏走红&#xff0c;成为了Steam游戏平台上&#xff0c;连续3周的销量冠军&#xff0c;有不少Mac电脑用户&#xff0c;利用Crossover成功玩上了《幻兽帕鲁》&#xff0c;其实Crossover已经支持很多3A游戏&#xff0c;包括《赛博朋克2077》《博德之…

Nicn的刷题日常之字符串左旋(详细图解思路,多解法,建议三连收藏)

目录 1.题目描述 一 2.解题想法图解 2.1直接解 2.2巧解 3.题目描述二 3.1.1思路1 3.1.2 思路2 4.结语 1.题目描述 一 实现现一个函数&#xff0c;可以左旋字符串中的k个字符。 例如&#xff1a; ABCD左旋一个字符得到BCDA ABCD左旋两个字符得到CDAB 2.解题想法图解 2.…

【QT+QGIS跨平台编译】之二十四:【GeoTIFF+Qt跨平台编译】(一套代码、一套框架,跨平台编译)

文章目录 一、GeoTIFF介绍二、文件下载三、文件分析四、pro文件五、编译实践一、GeoTIFF介绍 GeoTIFF是一种常用的地理信息系统(GIS)文件格式,其采用标签结构将栅格地理空间数据以及相关的元数据存储在一个单一的文件中。它是基于标准的TIFF(Tagged Image File Format)格…

使用wda框架实现IOS自动化测试详解

目录 1、weditor元素定位工具 1.1、weditor的安装和使用 2、wda iOS自动化框架 2.1、wda概述 2.2、wda安装 2.3、wda的使用 2.3.1、全局配置 2.3.2、创建客户端 2.3.3、APP相关操作 1、启动APP 2、关闭APP 3、获取APP状态信息 4、获取当前APP的运行信息 2.3.4、设…

【leetcode题解C++】98.验证二叉搜索树 and 701.二叉搜索树中的插入操作

98. 验证二叉搜索树 给你一个二叉树的根节点 root &#xff0c;判断其是否是一个有效的二叉搜索树。 有效 二叉搜索树定义如下&#xff1a; 节点的左子树只包含 小于 当前节点的数。节点的右子树只包含 大于 当前节点的数。所有左子树和右子树自身必须也是二叉搜索树。 示例…

ubuntu22.04@laptop OpenCV安装

ubuntu22.04laptop OpenCV安装 1. 源由2. 验证环境3. OpenCV安装3.1 系统升级3.2 Python安装3.3 OpenCV C/C环境安装3.4 OpenCV Python虚拟环境安装3.5 OpenCV检查 4. 总结5. 参考资料6. 补充 - python环境 1. 源由 最近&#xff0c;打算在Companion Computer上一些目标识别的…

PyQtWebEngine模块,PyQt5用于处理网页渲染和浏览器功能

一、简介 PyQtWebEngine 是 PyQt5 框架的一个模块&#xff0c;用于在 PyQt5 应用程序中嵌入 Web 引擎功能。它基于 Qt WebEngine 技术&#xff0c;提供了与 Web 内容交互的功能&#xff0c;包括显示网页、执行 JavaScript、处理网络请求等。 以下是 PyQtWebEngine 的一些特点和…

MYSQL——MySQL8.3无法启动

在新电脑上装了个MySQL&#xff0c;但是无法使用net start mysql启动&#xff0c;很是纳闷&#xff0c;使用mysqld --console去查看报错&#xff0c;也是没报错的&#xff0c;但是奇怪的是&#xff0c;我输入完这个mysqld --console之后&#xff0c;就等于启动了mysql了&#x…

[python]基于opencv实现的车道线检测

【检测原理】 一、首先进行canny边缘检测&#xff0c;为获取车道线边缘做准备 二、进行ROI提取获取确切的车道线边缘&#xff08;红色线内部&#xff09; 三、利用概率霍夫变换获取直线&#xff0c;并将斜率正数和复数的线段给分割开来 四、离群值过滤&#xff0c;剔除斜率…

大数据平台-可视化面板介绍-Echarts

应对现在数据可视化的趋势&#xff0c;越来越多企业需要在很多场景(营销数据&#xff0c;生产数据&#xff0c;用户数据)下使用&#xff0c;可视化图表来展示体现数据&#xff0c;让数据更加直观&#xff0c;数据特点更加突出。 目录 01-使用技术 02- 案例适配方案 03-基础…

【大厂AI课学习笔记】1.4 算法的进步(4)关于李飞飞团队的ImageNet

第一个图像数据库是ImageNet&#xff0c;由斯坦福大学的计算机科学家李飞飞推出。ImageNet是一个大型的可视化数据库&#xff0c;旨在推动计算机视觉领域的研究。这个数据库包含了数以百万计的手工标记的图像&#xff0c;涵盖了数千个不同的类别。 基于ImageNet数据库&#xf…

编译Faiss-gpu【InterMKL】C++ 按步骤操作 基本不会有问题的 python原理相同。

编译Faiss-gpu C++ 基本介绍 使用Faiss版本【1.7.4】 该项目依赖于BLAS 组件 OpenBLAS 和 IntelMKL BLAS 【官方支持】 IntelMKL 会比 OpenBLAS 快的多。 【来自官方结论】 本机环境 Cuda :11.1 Cuda-Driver: 515 InterMKL: 2021.2.0 Faiss :1.7.4 注意:faiss仅…

podman详解

Podman 是一个开源项目&#xff0c;用于开发、管理和运行容器和容器镜像。它与 Docker 非常相似&#xff0c;但有一些关键的不同之处。 Podman 的主要特点包括&#xff1a; 无守护进程&#xff1a;不同于 Docker&#xff0c;Podman 不需要运行一个永久的守护进程。这使得 Podm…