四、RocketMQ发送普通消息、批量消息和延迟消息

Producer发送普通消息的方式

1.同步发送消息

同步消息代表发送端发送消息到broker之后,等待消息发送结果后,再次发送消息
在这里插入图片描述

实现步骤

  1. 创建生产端,声明在哪个生产组
  2. 注册NameServer地址
  3. 构建Message实体,指定topic、tag、body
  4. 启动生产端
  5. 发送消息
@Test
public void syncSend() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {// 1.创建生产端,声明在哪个生产组DefaultMQProducer producer = new DefaultMQProducer("test_group");// 2.注册NameServer地址producer.setNamesrvAddr(NAME_SERVER_ADDR);// 3.构建Message实体,指定topic、tag、bodyMessage message = new Message("test", "hello world".getBytes());// 4.启动生产端producer.start();// 5.发送消息SendResult sendResult = producer.send(message);System.out.println(sendResult.getSendStatus());
}

2.异步发送消息

异步消息代表发送端发送完消息后,会直接返回,但是可以注册一个回调函数,当broker将消息落盘后,回调这个回调函数
在这里插入图片描述

实现步骤

  1. 创建生产端,声明在哪个生产组
  2. 注册NameServer地址
  3. 构建Message实体,指定topic、tag、body
  4. 启动生产端
  5. 发送消息,并且实现SendCallback接口

注:这里必须等待异步返回,否则消费者无法消费成功

@Test
public void asyncSend() throws  RemotingException, InterruptedException, MQClientException {DefaultMQProducer producer = new DefaultMQProducer("test_group");producer.setNamesrvAddr(NAME_SERVER_ADDR);Message message = new Message("test", "tag-a","hello world".getBytes());producer.start();CountDownLatch countDownLatch = new CountDownLatch(1);// 发送消息,并且实现SendCallback接口producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.println("发送成功:" + sendResult.getSendStatus());}@Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.println("发送失败:" + e);}});countDownLatch.await();
}

3、发送单向消息

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短

在这里插入图片描述

实现步骤

  1. 创建生产端,声明在哪个生产组
  2. 注册NameServer地址
  3. 构建Message实体,指定topic、tag、body
  4. 启动生产端
  5. 发送单向消息
@Test
public void sendOneWay() throws  RemotingException, InterruptedException, MQClientException {DefaultMQProducer producer = new DefaultMQProducer("test_group");producer.setNamesrvAddr(NAME_SERVER_ADDR);Message message = new Message("test","tag-a", "hello world".getBytes());producer.start();producer.sendOneway(message);
}

Producer发送批量消息

在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。

@Test
public void sendBatch() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");producer.setNamesrvAddr(RocketMQConfig.NAME_SERVER_ADDR);// 构造批量消息List<Message> list = new ArrayList<>();list.add(new Message(RocketMQConfig.TEST_TOPIC, "hello world0".getBytes(Charset.defaultCharset())));list.add(new Message(RocketMQConfig.TEST_TOPIC, "hello world1".getBytes(Charset.defaultCharset())));list.add(new Message(RocketMQConfig.TEST_TOPIC, "hello world2".getBytes(Charset.defaultCharset())));producer.start();// 发送批量消息producer.send(list);producer.shutdown();
}

**注:**需要注意的是批量消息的大小不能超过 1MiB(否则需要自行分割),其次同一批 batch 中 topic 必须相同。

Producer发送延迟消息

Producer想要发送延迟消息,只要设置Message的DelayTimeLevel属性大于0即可。

RocketMQ无法随意设置延迟消息的延迟时间,只能根据延迟级别进行

延迟级别和延迟时间的对应关系

延迟级别延迟时间延迟级别延迟时间
11s106min
25s117min
310s128min
430s139min
51min1410min
62min1520min
73min1630min
84min171h
95min182h
@Test
public void sendDelay() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");producer.setNamesrvAddr(RocketMQConfig.NAME_SERVER_ADDR);producer.start();Message message = new Message(RocketMQConfig.TEST_TOPIC, "hello world".getBytes(Charset.defaultCharset()));// 设置延迟级别message.setDelayTimeLevel(3);// 发送批量消息SendResult sendResult = producer.send(message);System.out.println(sendResult.getSendStatus());producer.shutdown();
}

延迟消息的原理

延迟消息并不会直接发送到指定的topic,而是发送到一个延迟消息对应的topic中

当延迟消息的时间到达后,在将消息发送到指定的topic中

延迟消息投递的流程

  1. producer端设置消息delayLevel延迟级别,消息属性DELAY中存储了对应了延时级别

  2. broker端收到消息后,判断延时消息延迟级别,如果大于0,则备份消息原始topic,queueId,并将消息topic改为延时消息队列特定topic(SCHEDULE_TOPIC),queueId改为延时级别的delayLevel-1

  3. mq服务端ScheduleMessageService中,为每一个延迟级别单独设置一个定时器,定时(每隔1秒)拉取对应延迟级别的消费队列

  4. 根据消费偏移量offset从commitLog中解析出对应消息

  5. 从消息tagsCode中解析出消息应当被投递的时间,与当前时间做比较,判断是否应该进行投递

  6. 若到达了投递时间,则构建一个新的消息,并从消息属性中恢复出原始的topic,queueId,并清除消息延迟属性,从新进行消息投递

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/104412.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Windows PowerShell 软件安装

Windows Management Framework&#xff08;WMF&#xff09;5.1 包含PowerShell 5.1。默认情况下&#xff0c;Windows Server 2008 R2 SP1 将运行较旧的PowerShell版本 2.通过将 WMF 5.1 下载并安装到Windows Server 2008 R2 系统&#xff0c;我们可以将其升级到PowerShell版本 …

GBJ2510-ASEMI电源控制柜专用GBJ2510

编辑&#xff1a;ll GBJ2510-ASEMI电源控制柜专用GBJ2510 型号&#xff1a;GBJ2510 品牌&#xff1a;ASEMI 封装&#xff1a;GBJ-4 恢复时间&#xff1a;&#xff1e;50ns 正向电流&#xff1a;25A 反向耐压&#xff1a;1000V 芯片个数&#xff1a;4 引脚数量&#xf…

HBase 表如何按照某表字段排序后顺序存储的方法?

首先需要明白HBase表的排序规则&#xff1a; &#xff08;1&#xff09;rowkey排序&#xff08;字典排序&#xff09;——升序 &#xff08;2&#xff09;Column排序&#xff08;字典排序&#xff09;——升序 &#xff08;3&#xff09;时间戳排序——降序 rowkey 字典序排序…

计算机毕业设计选什么题目好?springboot 医院门诊在线预约挂号系统

✍✍计算机编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java实战 |…

高数笔记03:几何、物理应用

图源&#xff1a;文心一言 本文是我学习高等数学几何、物理应用的一些笔记和心得&#xff0c;希望可以与考研路上的小伙伴一起努力上岸~~&#x1f95d;&#x1f95d; 第1版&#xff1a;查资料、画导图~&#x1f9e9;&#x1f9e9; 参考资料&#xff1a;《高等数学 基础篇》武…

读书笔记:多Transformer的双向编码器表示法(Bert)-3

多Transformer的双向编码器表示法 Bidirectional Encoder Representations from Transformers&#xff0c;即Bert&#xff1b; 第3章 Bert实战 学习如何使用预训练的BERT模型&#xff1a; 如何使用预训练的BERT模型作为特征提取器&#xff1b;探究Hugging Face的Transforme…

PyTorch 深度学习之多分类问题Softmax Classifier(八)

1. Revision: Diabetes dataset 2. Design 10 outputs using Sigmoid? 2.1 Output a Distribution of prediction with Softmax 2.2 Softmax Layer Example, 2.3 Loss Function-Cross Entropy Cross Entropy in Numpy Cross Entropy in PyTorch 注意交叉熵损失&#xff0c;最…

Vscode中使用Romote远程开发调试Ros2环境

首先&#xff0c;成功安装ros2环境&#xff0c;参考官方文档中的教程&#xff0c;能用运行出来此处的代码 Writing a simple publisher and subscriber (Python) — ROS 2 Documentation: Iron documentation 下载vscode&#xff0c;进行远程开发&#xff0c;具体参考&#xf…

XSS、CSRF、sql注入

sql注入 就是通过把SQL命令插入到Web表单递交或输入域名或页面请求的查询字符串&#xff0c;最终达到欺骗服务器执行恶意的SQL命令。 sql注入防范 1.永远不要信任用户的输入&#xff0c;要对用户的输入进行校验&#xff0c;可以通过正则表达式&#xff0c;或限制长度&#x…

消息队列缓存,以蓝牙消息服务为例

前言 消息队列缓存&#xff0c;支持阻塞、非阻塞模式&#xff1b;支持协议、非协议模式 可自定义消息结构体数据内容 使用者只需设置一些宏定义、调用相应接口即可 这里我用蓝牙消息服务举例 有纰漏请指出&#xff0c;转载请说明。 学习交流请发邮件 1280253714qq.com 原…

IDEA通过Docker插件部署SpringBoot项目

1、配置Docker远程连接端口 找到并编辑服务器上的docker.service文件。 vim /usr/lib/systemd/system/docker.service在下面ExecStart替换成下面的 ExecStart/usr/bin/dockerd -H tcp://0.0.0.0:2375 -H unix://var/run/docker.sock2.重启docker systemctl daemon-reload s…

Kafka 开启SASL/SCRAM认证 及 ACL授权(三)验证

Kafka 开启SASL/SCRAM认证 及 ACL授权(三)验证。 官网地址:https://kafka.apache.org/ 本文说明如何做client验证ACL是否生效,我们之前开启了无acl信息不允许访问的配置。涉及的client有以下几个场景:shell脚本、python脚本、java应用、flink流。 kafka shell script验证…

内存文件初始化

要在内存中初始化一个SQLite数据库文件&#xff0c;可以使用SQLite提供的特殊URI格式进行连接。以下是一种常见的方法&#xff1a; #include <sqlite3.h>int main() {sqlite3* db;// 在内存中创建或打开数据库文件int ret sqlite3_open(":memory:", &db)…

墨者学院 WordPress 远程命令执行漏洞(CVE-2018-15877)

1. 背景介绍 近日&#xff0c;WordPress 插件Plainview Activity Monitor被曝出存在一个远程命令执行漏洞。Plainview Activity Monitor 是一款网站用户活动监控插件。 远程攻击者可以通过构造的url来诱导wordpress管理员来点击恶意链接最终导致远程命令执行 2.影响范围 Pla…

Windows bat 脚本设计-开机自启动服务的方法、bat 调用另外的 bat 脚本 -没有java环境也能运行jar,在不安装jdk下如何运行jar包

目录 一、start.bat 启动服务 bat 脚本代码设计 && 没有java环境也能运行jar&#xff0c;在不安装jdk下如何运行jar包二、关闭 bat 启动的服务三、Windows 开机自启动服务的方法四、bat 调用另外的 bat 脚本参考链接 一、start.bat 启动服务 bat 脚本代码设计 &&am…

07_03文件系统怎么玩的

文件系统 Linux将文件系统分为了两层&#xff1a;VFS&#xff08;虚拟文件系统&#xff09;、具体文件系统&#xff0c;如下图所示&#xff1a; VFS&#xff08;Virtual Filesystem Switch&#xff09;称为虚拟文件系统或虚拟文件系统转换&#xff0c;是一个内核软件层&#…

chatgpt GPT-4V是如何实现语音对话的

直接上代码 https://chat.openai.com/voice/get_token 1. 请求内容 Request:GET /voice/get_token HTTP/1.1 Host: ios.chat.openai.com Content-Type: application/json Cookie: _puiduser***Fc9T:16962276****Nph%2Fb**SU%3D; _uasid"Z0FBQUF***nPT0"; __cf_bmBUg…

【解题报告】牛客挑战赛70 maimai

题目链接 这个挑战赛的 F F F是我出的&#xff0c;最后 zhoukangyang 爆标了。。。orzorz 记所有有颜色的边的属性集合 S S S 。 首先在外层容斥&#xff0c;枚举 S ∈ [ 0 , 2 w ) S\in [0,2^w) S∈[0,2w)&#xff0c;计算被覆盖的的边中不包含 S S S 中属性&#xff0c…

目标检测网络系列——YOLO V2

文章目录 YOLO9000better,更准batch Normalization高分辨率的训练使用anchor锚框尺寸的选择——聚类锚框集成改进——直接预测bounding box细粒度的特征图——passthrough layer多尺度训练数据集比对实验VOC 2007VOC 2012COCOFaster,更快网络模型——Darknet19训练方法Strong…