RocketMQ教程(六):RocketMQ的消息生产

环境配置

RocketMQ版本:5.2.0

RocketMQ SDK版本:5.2.0

引入依赖

implementation 'org.apache.rocketmq:rocketmq-client:5.2.0'

消息生产

消息的种类分成四种,普通消息、顺序消息、事务消息和延时消息,发生消息的方式也分为同步发送、异步发送、单向发送 三种。

1、普通消息

同步发送

同步发送消息是指,Producer发出⼀条消息后,会在收到MQ返回的ACK之后才发下⼀条消息。该方式的消息可靠性最高,但消息发送效率太低。

同步发送消息代码:

public class SyncSendProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {int count = 100;// 创建一个producer,参数为生产者组名称DefaultMQProducer producer = new DefaultMQProducer("apg");// 指定nameServer地址producer.setNamesrvAddr("192.168.131.130:9876");// 设置当发送失败时重试发送的次数,默认为2次producer.setRetryTimesWhenSendFailed(3);// 设置发送超时时限为5s,默认3sproducer.setSendMsgTimeout(5000);// 开启生产者producer.start();// 生产并发送100条消息for (int i=0;i<count;i++){byte[] body = ("Hi," + i).getBytes();Message msg = new Message("someTopic", "someTag", body);// 为消息指定keymsg.setKeys("key_"+i);// 发送消息SendResult sendResult = producer.send(msg);System.out.println(sendResult);}// 关闭生产者producer.shutdown();}
}

发送结果:

打印日志: 

SendResult [sendStatus=SEND_OK, msgId=7F0000014D8C73D16E9335AB038A005D, offsetMsgId=0A02088900002A9F0000000000011708, messageQueue=MessageQueue [topic=someTopic, brokerName=DESKTOP-GAN2KUG, queueId=3], queueOffset=73]
SendResult [sendStatus=SEND_OK, msgId=7F0000014D8C73D16E9335AB038C005E, offsetMsgId=0A02088900002A9F00000000000117FC, messageQueue=MessageQueue [topic=someTopic, brokerName=DESKTOP-GAN2KUG, queueId=0], queueOffset=73]
SendResult [sendStatus=SEND_OK, msgId=7F0000014D8C73D16E9335AB038E005F, offsetMsgId=0A02088900002A9F00000000000118F0, messageQueue=MessageQueue [topic=someTopic, brokerName=DESKTOP-GAN2KUG, queueId=1], queueOffset=73]
SendResult [sendStatus=SEND_OK, msgId=7F0000014D8C73D16E9335AB038F0060, offsetMsgId=0A02088900002A9F00000000000119E4, messageQueue=MessageQueue [topic=someTopic, brokerName=DESKTOP-GAN2KUG, queueId=2], queueOffset=74]
SendResult [sendStatus=SEND_OK, msgId=7F0000014D8C73D16E9335AB03910061, offsetMsgId=0A02088900002A9F0000000000011AD8, messageQueue=MessageQueue [topic=someTopic, brokerName=DESKTOP-GAN2KUG, queueId=3], queueOffset=74]
SendResult [sendStatus=SEND_OK, msgId=7F0000014D8C73D16E9335AB03920062, offsetMsgId=0A02088900002A9F0000000000011BCC, messageQueue=MessageQueue [topic=someTopic, brokerName=DESKTOP-GAN2KUG, queueId=0], queueOffset=74]

可以看出来一个topic默认在一个broker上分配四个队列,每次投递消息是以轮询的方式每个队列投递一条消息。

对于返回结果中的sendStatus代表发送状态,在源码中有一下三个值:

public enum SendStatus {SEND_OK, FLUSH_DISK_TIMEOUT,FLUSH_SLAVE_TIMEOUT,SLAVE_NOT_AVAILABLE;private SendStatus() {}
}

SEND_OK:发送成功

FLUSH_DISK_TIMEOUT:刷盘超时。当Broker设置的刷盘策略为同步刷盘时才可能出现这种异常状态,异步刷盘不会出现 。

FLUSH_SLAVE_TIMEOUT:Slave同步超时。当Broker集群设置的Master-Slave的复制方式为同步复制时才可能出现这种异常状态,异步复制不会出现 。

SLAVE_NOT_AVAILABLE:没有可用的Slave。当Broker集群设置为Master-Slave的 复制方式为同步复制时才可能出现这种异常状态,异步复制不会出现。

异步发送

异步发送消息是指,Producer发出消息后无需等待MQ返回ACK,直接发送下⼀条消息。该方式的消息可靠性可以得到保障,消息发送效率也可以。

代码:

public class AsyncSendProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {int messageCount = 100;// 创建一个producer,参数为生产者组名称DefaultMQProducer producer = new DefaultMQProducer("apg");// 指定nameServer地址producer.setNamesrvAddr("192.168.131.130:9876");// 指定异步发送失败后不进行重试发送producer.setRetryTimesWhenSendAsyncFailed(0);// 指定新创建的Topic的Queue数量为2,默认为4producer.setDefaultTopicQueueNums(2);// 开启生产者producer.start();// 生产并发送100条消息//由于是异步发送,这里引入一个countDownLatch,保证所有Producer发送消息的回调方法都执行完了再停止Producer服务。final CountDownLatch countDownLatch = new CountDownLatch(messageCount);for (int i=0;i<messageCount;i++){try {final int index = i;byte[] body = ("Hi," + i).getBytes();Message msg = new Message("AsyncProductTopic", "AsyncTopic", body);producer.send(msg, new SendCallback() {//异步发送的回调@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});System.out.println("消息发送完成");}catch (Exception e){e.printStackTrace();}}countDownLatch.await(5, TimeUnit.SECONDS);// 关闭生产者producer.shutdown();}
}

发送结果

 打印日志 

6          OK 7F0000014DB473D16E9335B47CD60008 
2          OK 7F0000014DB473D16E9335B47CCA0000 
12         OK 7F0000014DB473D16E9335B47CD6000B 
4          OK 7F0000014DB473D16E9335B47CCA0002 
7          OK 7F0000014DB473D16E9335B47CD60007 

单向发送

单向发送消息是指,Producer仅负责发送消息,不等待、不处理MQ的ACK。该发送方式时MQ也不返回ACK。该方式的消息发送效率最高,但消息可靠性较差。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/26321.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

15. 《C语言》——【如何动态内存开辟】

亲爱的读者&#xff0c;大家好&#xff01;我是一名正在学习编程的高校生。在这个博客里&#xff0c;我将和大家一起探讨编程技巧、分享实用工具&#xff0c;并交流学习心得。希望通过我的博客&#xff0c;你能学到有用的知识&#xff0c;提高自己的技能&#xff0c;成为一名优…

CST软件眼图工具Eye Diagram Tools (中)--- Classical流程

距离上次眼图介绍快两年了&#xff0c;由于上期已经将重点推荐的方法&#xff08;statistical流程&#xff09;介绍了&#xff0c;所以一直没急着涉及这个话题。 仿真实例011&#xff1a;眼图工具Eye Diagram Tools&#xff08;上&#xff09; 先总结一下之前介绍过的内容&am…

容器:现代计算的基础设施

人不走空 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌赋&#xff1a;斯是陋室&#xff0c;惟吾德馨 目录 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌…

大模型初学者怎么入门大语言模型(LLM)?

前言 在当今的科技浪潮中&#xff0c;人工智能&#xff08;AI&#xff09;和机器学习技术已经取得了长足的进步&#xff0c;其中大模型的发展尤为引人注目。大模型&#xff0c;通常指的是拥有海量参数、能够处理复杂任务的深度学习模型&#xff0c;如自然语言处理&#xff08;…

RT-DETR 详解之 Uncertainty-minimal Query Selection

引言 在上一章博客中博主已经完成查询去噪向量构造部分的讲解&#xff08;DeNoise&#xff09;在本篇博客中&#xff0c;我们将进行Uncertainty-minimal Query Selection创新点的讲解。 Uncertainty-minimal Query Selection是RT-DETR提出的第二个创新点&#xff0c;其作用是…

UE5 Sequencer 使用指导 - 学习笔记

https://www.bilibili.com/video/BV1jG411L7r7/?spm_id_from333.337.search-card.all.click&vd_source707ec8983cc32e6e065d5496a7f79ee6 Sequencer 01 1.1 调整视口 调整窗口数量 调整视口类型为Cinematic视口 视口显示网格&#xff0c;或者条件参考线 1.2 关卡动画与…

架构设计-用户信息及用户相关的密码信息设计

将用户的基本信息和用户密码存放在不同的数据库表中是一种常见的安全做法&#xff0c;这种做法旨在增强数据的安全性和管理的灵活性。以下是这种做法的几个关键原因&#xff1a; 安全性增强&#xff1a; 当用户密码被单独存放在一个表中时&#xff0c;可以使用更强大的加密和哈…

计算机毕业设计 | SpringBoot+vue的教务管理系统

1&#xff0c;绪论 1.1 项目背景 在这个资讯高度发展的时代&#xff0c;资讯管理变革已经是一个更为宽泛、更为全面的潮流。为了保证中国的可持续发展&#xff0c;随着信息化技术的不断进步&#xff0c;教务管理体系也在不断完善。与此同时&#xff0c;伴随着信息化的飞速发展…

Hexapod C-887 使用手册 -- 1

最近要做PI C-887的集成控制&#xff0c;先把使用手册看一便&#xff0c;在此记录阅读进度。 本人微信号&#xff0c;如果有项目合作&#xff0c;可以加本人微信。 1 有关此文档 在本章中 本用户手册的目标和目标群体 符号和打印规则 图 术语的定义 其它适用文件 下载手…

Python GUI 库跨平台兼容问题及解决方案

在选择 Python GUI 库时&#xff0c;跨平台兼容性是一个重要的考虑因素。不同的 GUI 库可能在不同的操作系统上表现不同&#xff0c;因此需要选择一个能够在多个平台上运行良好的库。如果我们遇到下面的问题&#xff0c;可以尝试下我整理的方法。 1、问题背景 Python 作为一门…

基于【Lama Cleaner】一键秒去水印,轻松移除不想要的内容!

一、项目背景 革命性的AI图像编辑技术,让您的图片焕然一新!无论水印、logo、不想要的人物或物体,都能被神奇地移除,只留下纯净的画面。操作简单,效果出众,给你全新的视觉体验。开启图像编辑新纪元,尽在掌控! 利用去水印开源工具Lama Cleaner对照片中"杂质"进行去除…

【Vue】获取模块内的actions方法

目标&#xff1a; 掌握模块中 action 的调用语法 (同理 - 直接类比 mutation 即可) 注意&#xff1a; 默认模块中的 mutation 和 actions 会被挂载到全局&#xff0c;需要开启命名空间&#xff0c;才会挂载到子模块。 调用语法&#xff1a; 直接通过 store 调用 $store.di…

【Go语言】面向对象编程(一):类的定义、初始化和成员方法

面向对象编程&#xff08;一&#xff09;&#xff1a;类的定义、初始化和成员方法 1 类的定义和初始化 Go 语言的面向对象编程没有 class 、 extends 、implements 之类的关键字和相应的概念&#xff0c;而是借助结构体来实现类的声明&#xff0c;如下是定义一个学生类的方法…

Virtual Memory Primitives for User Program翻译

Virtual Memory Primitives for User Program 安德鲁阿普尔&#xff08;Andrew Appel&#xff09;和李凯&#xff08;Kai Li&#xff09; 普林斯顿大学计算机科学系 摘要 传统上&#xff0c;内存管理单元&#xff08;MMUS&#xff09;被操作系统用于实现磁盘分页的虚拟内存…

FullCalendar日历组件集成实战(9)

背景 有一些应用系统或应用功能&#xff0c;如日程管理、任务管理需要使用到日历组件。虽然Element Plus也提供了日历组件&#xff0c;但功能比较简单&#xff0c;用来做数据展现勉强可用。但如果需要进行复杂的数据展示&#xff0c;以及互动操作如通过点击添加事件&#xff0…

一文详谈大模型 RAG 优化方案与实践

暑期实习基本结束了&#xff0c;校招即将开启。 不同以往的是&#xff0c;当前职场环境已不再是那个双向奔赴时代了。求职者在变多&#xff0c;HC 在变少&#xff0c;岗位要求还更高了。提前准备才是完全之策。 最近&#xff0c;我们又陆续整理了很多大厂的面试题&#xff0c…

肾合与出汗:一场你不得不关注的健康对话

设想一下&#xff0c;我们的身体就像是一部精妙复杂的交响乐&#xff0c;每一个细胞、每一个组织都是乐符&#xff0c;共同编织出生命的旋律&#xff0c;演绎着我们的过去与未来。而汗水&#xff0c;就如同交响乐中的琴弦振动&#xff0c;它流淌在我们的体表&#xff0c;记录着…

电商API接口接入||电商比价项目比价系统搭建需要注意哪些?

在搭建一个淘宝/京东比价系统时&#xff0c;需要注意以下几个方面&#xff0c;以确保系统的有效性、准确性和用户友好性&#xff1a; 确定平台和商品范围&#xff1a; 明确系统覆盖的电商平台&#xff0c;如淘宝、京东等。确定要比较的商品类别和范围&#xff0c;以确保数据的…

JFinal学习06 控制器——getPara()接收数据

JFinal学习06 控制器——getPara()接收数据 视频来源https://www.bilibili.com/video/BV1Bt411H7J9/?spm_id_from333.337.search-card.all.click 文章目录 JFinal学习06 控制器——getPara()接收数据零、JFinal数据提交的三种方式一、get提交二、post提交三、url参数化提交四、…