RocketMQ-快速实战

MQ简介

MQ:MessageQueue,消息队列。是在互联网中使用非常广泛的一系列服务中间件。

Message:消息。消息是在不同进程之间传递的数据。这些进程可以部署在同一台机器上,也可以分布在不同机器上。(数据形式:二进制压缩数据、RPC、http,都属于进程间通讯的机制)

Queue:队列。队列原意是指一种具有FIFO(先进先出)特性的数据结构,是用来缓存数据的。对于消息中间件产品来说,能不能保证FIFO特性,尚值得考量。但是,所有消息队列都是需要具备存储消息,让消息排队的能力。

作用:

  • 异步,提高系统的响应速度、吞吐量。

  • 解耦,减少服务之间的影响。提高系统整体的稳定性以及可扩展性。另外,解耦后可以实现数据分发。生产者发送一个消息后,可以由一个或者多个消费者进行消费,并且消费者的增加或者减少对生产者没有影响。

  • 消峰,以稳定的系统资源应对突发的流量冲击。

RocketMQ产品特点

RocketMQ介绍

RocketMQ是阿里巴巴开源的一个消息中间件,在阿里内部历经了双十一等很多高并发场景的考验,能够处理亿万级别的消息。2016年开源后捐赠给Apache,现在是Apache的一个顶级项目。

早期阿里使用ActiveMQ,但是,当消息开始逐渐增多后,ActiveMQ的IO性能很快达到了瓶颈。于是,阿里开始关注Kafka。但是Kafka是针对日志收集场景设计的,他的高级功能并不是很贴合阿里的业务场景。尤其当他的Topic过多时,由于Partition文件也会过多,这就会加大文件索引的耗时,会严重影响IO性能。于是阿里才决定自研中间件,最早叫做MetaQ,后来改名成为RocketMQ。最早他所希望解决的最大问题就是多Topic下的IO性能压力。但是产品在阿里内部的不断改进,RocketMQ开始体现出一些不一样的优势。

RocketMQ特点

当今互联网MQ产品众多,其中,影响力和使用范围最大的当数Apache Kafka、RabbitMQ、Apache RocketMQ以及Apache Plusar。这几大产品虽然都是典型的MQ产品,但是由于设计和实现上的一些差异,造成他们适合于不同的细分场景。

优点缺点适合场景
Apache Kafka吞吐量非常大,性能非常好,集群高可用。会有丢数据的可能,功能比较单一日志分析、大数据采集
RabbitMQ消息可靠性高,功能全面。erlang语言不好定制。吞吐量比较低。企业内部小规模服务调用
Apache Pulsar基于Bookeeper构建,消息可靠性非常高。周边生态还有差距,目前使用的公司比较少。企业内部大规模服务调用
Apache RocketMQ高吞吐、高性能、高可用。功能全面。客户端协议丰富。使用java语言开发,方便定制。服务加载比较慢。几乎全场景,特别适合金融场景

其中RocketMQ,孵化自阿里巴巴。历经阿里多年双十一的严格考验,RocketMQ可以说是从全世界最严苛的高并发场景中摸爬滚打出来的过硬产品,也是少数几个在金融场景比较适用的MQ产品。从横向对比来看,RocketMQ与Kafka和RabbitMQ相比。RocketMQ的消息吞吐量虽然和Kafka相比还是稍有差距,但是却比RabbitMQ高很多。在阿里内部,RocketMQ集群每天处理的请求数超过5万亿次,支持的核心应用超过3000个。而RocketMQ最大的优势就是他天生就为金融互联网而生。他的消息可靠性相比Kafka也有了很大的提升,而消息吞吐量相比RabbitMQ也有很大的提升。另外,RocketMQ的高级功能也越来越全面,广播消费、延迟队列、死信队列等等高级功能一应俱全,甚至某些业务功能比如事务消息,已经呈现出领先潮流的趋势。

RocketMQ快速实战

快速搭建RocketMQ服务

RocketMQ的官网地址: RocketMQ · 官方网站 | RocketMQ

下载页面地址:下载 | RocketMQ

当前最新的版本是5.x,这是一个着眼于云原生的新版本,给 RocketMQ 带来了非常多很亮眼的新特性。但是目前来看,企业中用得还比较少。因此,我们这里采用的还是更为稳定的4.9.5版本。

注:在2020年下半年,RocketMQ新推出了5.0的大版本,这对于RocketMQ来说,是一个里程碑式的大版本。在这个大版本中,RocketMQ对整体功能做了一次大的升级。增加了很多非常有用的新特性,也对已有功能重新做了升级。

比如在具体功能方面,在4.x版本中,对于定时消息,只能设定几个固定的延迟级别,而5.0版本中,已经可以指定具体的发送时间了。在客户端语言方面,4.x版本,RocketMQ原生只支持基于Netty框架的Java客户端。而在5.0版本中,增加了对Grpc协议的支持,这基本上就解除了对客户端语言的限制。在服务端架构方面,4.x版本只支持固定角色的普通集群和可以动态切换角色的Dledger集群,而在5.0版本中,增加了Dledger Controller混合集群模式,即可以混合使用Dledger的集群机制以及 Broker 本地的文件管理机制。

但是功能强大,同时也意味着问题会很多。所以目前来看,企业中直接用新版本的还比较少。小部分使用新版本的企业,也大都是使用内部的改造优化版本。

这里下载的是这个版本:

上传到服务器并解压:(unzip rocketmq-all-4.9.5-bin-release.zip

RocketMQ建议的运行环境需要至少12G的内存,这是生产环境比较理想的资源配置。但是我买的云服务器是2核4g,所以需要修改启动配置:(:set number临时显示行号)

注意:生产环境不建议修改上面两个配置。

RocketMQ是基于Java开发的,所以依赖Java开发环境,安装JDK步骤省略,建议采用1.8版本

RocketMQ的后端服务分为nameserver和broker两个服务:

# 第一步:启动nameserver服务,进入安装目录执行命令
nohup bin/mqnamesrv &
# 是否启动成功可以通过jps检查,启动成功或失败可以查看nohup.out文件# 为了方便测试在conf/broker.conf文件添加配置:
autoCreateTopicEnable=true
# 注意:如果是云服务器,还需要额外添加一行配置
brokerIP1 = 你的公网IP# 第二步:启动broker服务,进入安装目录执行命令
nohup bin/mqbroker &

注意:

1、在实际服务部署时,通常会将RocketMQ的部署地址添加到环境变量当中。例如使用vi ~/.bash_profile指令,添加以下内容

export ROCKETMQ_HOME=/home/rocket/rocketmq-all-4.9.5-bin-release // 修改为你的安装目录
PATH=$ROCKETMQ_HOME/bin:$PATH
export PATH

2、停止RocketMQ服务可以通过mqshutdown指令进行,停止服务有短暂延迟,不建议kill杀进程

mqshutdown namesrv # 关闭nameserver服务
mqshutdown broker # 关闭broker服务

快速实现消息收发

1、命令行快速实现消息收发

第一步:需要配置一个环境变量NAMESRV_ADDR,指向之前启动的nameserver服务。

通过vi ~/.bash_profile添加以下配置。然后使用source ~/.bash_profile让配置生效。

export NAMESRV_ADDR='localhost:9876'

修改后文件:

第二步:通过指令启动RocketMQ的消息生产者发送消息。默认往RocketMQ中发送1000条消息

tools.sh org.apache.rocketmq.example.quickstart.Producer ...消息发送日志
SendResult [sendStatus=SEND_OK, msgId=7F0000018FBA1B6D358697CBE7FB03E7, offsetMsgId=C0A800DA00002A9F000000000005DA64, messageQueue=MessageQueue [topic=TopicTest, brokerName=hcss-ecs-3744, queueId=3], queueOffset=499]
11:25:22.820 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
11:25:22.825 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
11:25:22.825 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.0.218:10911] result: true

第三步:可以启动消息消费者接收之前发送的消息

tools.sh org.apache.rocketmq.example.quickstart.Consumer...消息消费日志
ConsumeMessageThread_please_rename_unique_group_name_4_15 Receive New Messages: [MessageExt [brokerName=hcss-ecs-3744, queueId=2, storeSize=192, queueOffset=199, sysFlag=0, bornTimestamp=1701312827986, bornHost=/192.168.0.218:32850, storeTimestamp=1701312827987, storeHost=/192.168.0.218:10911, msgId=C0A800DA00002A9F00000000000256D2, commitLogOffset=153298, bodyCRC=748130833, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1701314617997, UNIQ_KEY=7F0000018E561B6D358697AEFE52031F, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 57, 57], transactionId='null'}]]

注意:这个Consumer消费者的指令并不会主动结束,他会继续挂起,等待消费新的消息。可以使用CTRL+C停止该进程。

2、搭建Maven客户端项目

第一步:创建一个标准的maven项目,在pom.xml中引入以下核心依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.5</version>
</dependency>

第二步:就可以直接创建一个简单的消息生产者

public class Producer
{public static void main(String[] args)throws MQClientException, InterruptedException{// 初始化一个消息生产者DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 指定nameserver地址producer.setNamesrvAddr("192.168.232.128:9876");// 启动消息生产者服务producer.start();for (int i = 0; i < 2; i++){try{// 创建消息。消息由Topic,Tag和body三个属性组成,其中Body就是消息内容Message msg =new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息,获取发送结果SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}catch (Exception e){e.printStackTrace();Thread.sleep(1000);}}// 消息发送完后,停止消息生产者服务。producer.shutdown();}
}

注意:对于生产者,需要指定对应的nameserver服务的地址,这个地址需要指向你自己的服务器。

第三步:创建一个消息消费者接收RocketMQ中的消息。

public class Consumer
{public static void main(String[] args)throws InterruptedException, MQClientException{// 构建一个消息消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");// 指定nameserver地址consumer.setNamesrvAddr("192.168.232.128:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// 订阅一个感兴趣的话题,这个话题需要与消息的topic一致consumer.subscribe("TopicTest", "*");// 注册一个消息回调函数,消费到消息后就会触发回调。consumer.registerMessageListener(new MessageListenerConcurrently(){@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){msgs.forEach(messageExt -> {try{System.out.println("收到消息:" + new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));}catch (UnsupportedEncodingException e){}});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者服务consumer.start();System.out.print("Consumer Started");}
}

注意:对于消费者,同样需要指定nameserver的地址,另外消费者需要在RocketMQ中订阅具体的Topic,只有发送到这个Topic上的消息才会被这个消费者接收到

生产消费报错:RemotingTooMuchRequestException: sendDefaultImpl call timeout

解决方法:

1、在conf/broker.conf 中加入配置:

namesrvAddr = 你的公网IP:9876
brokerIP1 = 你的公网IP

2、重启broker,启动命令指定配置文件:

nohup mqbroker -n localhost:9876 -c conf/broker.conf &

重启完成,上面的生产者消费者测试代码通过

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/188416.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C++笔记】红黑树封装map和set

一、map和set的泛型封装逻辑 map和set的底层都是红黑树&#xff0c;所以我们想要用红黑树封装map和set的第一个问题就来了&#xff0c;因为set是key结构而map是key-value结构&#xff0c;怎样用同一个底层结构去封装出两个不同存储结构的容器呢&#xff1f;难道我们要将红黑树…

git的版本控制流程

1、git是一款版本控制工具 例如我们常用的淘宝&#xff0c;每次升级&#xff0c;版本号就会加一。那么我们怎么控制版本号呢&#xff1f; --使用git。 2、最常使用的git指令 git add . 暂存 git commit -m"***" 提交到本地 git pull 将远程仓库代码下拉到本地 git …

threejs教程

应群友要求出了个小白教程&#xff0c;此外也有进阶教程。 替代之前老版本的教程。 教程案例&#xff1a; 新教程地址&#xff1a;https://www.wellyyss.cn/ys-course/#/ 教程使用的是react搭建的 高级教程主要是案例 年底比较忙估计要晚一点才整合。 后续的计划是&#xff1…

Leetcod面试经典150题刷题记录——数组 / 字符串篇

数组 / 字符串篇 1. 合并两个有序数组Python3排序法双指针法 2. 删除有序数组中的重复元素3. H 指数Python3排序法计数排序法二分查找 有个技巧&#xff0c;若想熟悉语言的写法&#xff0c;可以照着其它语言的题解&#xff0c;写目标语言的代码&#xff0c;比如有C/C的题解&…

HarmonyOS应用开发者基础认证考试(98分答案)

基于最近大家都在考这个应用开发者基础认证考试&#xff0c;因此出了一期&#xff0c;一样复制word里面搜索做&#xff0c;很快&#xff0c;当然good luck 判断题 Ability是系统调度应用的最小单元,是能够完成一个独立功能的组件。一个应用可以包含一个或多个Ability。 正确(Tr…

应用于智慧社区的AI边缘计算盒子+AI算法软硬一体化方案

据统计&#xff0c;全国大约有45W个小区&#xff0c;监控高空抛物、治理乱扔垃圾、人员管理、烟火检测、占道、人流量检测、车型检测等&#xff1b;营造社区安全等需求跟每一个参与者息息相关&#xff0c;据法院公开资料显示&#xff0c;每年有1000宗以上跟高空抛物有关的各类案…

进程与线程的区别

作者简介&#xff1a; zoro-1&#xff0c;目前大二&#xff0c;正在学习Java&#xff0c;数据结构,mysql,javaee等 作者主页&#xff1a; zoro-1的主页 欢迎大家点赞 &#x1f44d; 收藏 ⭐ 加关注哦&#xff01;&#x1f496;&#x1f496; 进程与线程的区别 进程线程进程与线…

HyperBDR云容灾v4.10.1发布,划重点:支持UCloud云平台自动化容灾+新增可灵活定义的备份策略

版本更新 HyperBDR云容灾v4.10.1版本来啦&#xff01; 此次更新为大家带来了多个新功能&#xff0c;下面让我们来看看具体是哪些吧~ 01 策略管理新功能&#xff1a; 多时间段限速功能&#xff1a; 更加灵活的多个时间段限速选择&#xff0c;可以在创建策略时为不同的时间段设…

数字人可以为文化传播带来什么?

近日&#xff0c;由哈萨克斯坦驻华大使馆、中国外文局文化传播中心、中关村科幻产业创新中心联合发起的中哈青年友谊数字人怡漾和苏路&#xff08;Сұлу&#xff09;正式发布。其中&#xff0c;代表中方形象的数字人怡漾&#xff0c;不仅将成为中哈青年文化交流的标志与代言…

JavaScript WebAPI(三)(详解)

这次介绍一下webAPI中的一些知识&#xff1a; 回调函数 回调函数是指 如果将函数A做为参数传递给函数B时&#xff0c;我们称函数A为回调函数 例如&#xff1a; // 立即执行函数中传递的函数是一个回调函数 (function(){ console.log("我是回调函数") })(); // …

无分模块下Mybatis官方生成器

有无分模块其实对生成器生成代码没有影响&#xff0c;只是无分模块更加好理解&#xff0c;也就是添加插件和依赖&#xff0c;指定配置文件&#xff0c;运行插件即可。 添加依赖和插件 在插件处指定配置文件位置 <dependencies><!-- MyBatis核心依赖包 --><de…

汽车行驶不同工况数据

1、内容简介 略 28-可以交流、咨询、答疑 2、内容说明 汽车行驶不同工况数据 汽车行驶不同工况数据 ECE、EUDC、FTP75、NEDC、自定义 3、仿真分析 4、参考论文 略 链接&#xff1a;https://pan.baidu.com/s/1AAJ_SlHseYpa5HAwMJlk1w 提取码&#xff1a;rvol

Visual Studio2022创建Windows服务程序

文章目录 Visual Studio2022创建Windows服务程序打开工具创建新项目创建成功重命名服务添加安装程序编写逻辑生成程序安装服务打开服务启动服务停止服务卸载服务修改项目配置重新生成安装服务启动服务 Visual Studio2022创建Windows服务程序 打开工具 创建新项目 创建成功 重命…

文件重命名不再困难:文件智能化重命名技巧,告别手动提升效率

在日常工作中&#xff0c;经常会遇到要修改文件名的场景。传统的文件重命名方法往往要手动输入新的文件名&#xff0c;不仅耗时而且容易出错。为了提高效率&#xff0c;可以采用一些智能化重命名的技巧&#xff0c;告别手动修改文件名的繁琐过程&#xff0c;让文件重命名变得更…

华为鸿蒙工程师成“热门”!最高年薪160万,只看技术不看年龄

引言&#xff1a; 今天&#xff0c;在互联网行业&#xff0c;超过30岁的工程师往往被认为是“码农”的生命周期终点。然而&#xff0c;华为鸿蒙系统的崛起&#xff0c;却再次给予了这些35岁以上的工程师们第二春的机会。国内一线互联网公司纷纷向鸿蒙系统靠拢&#xff0c;导致…

代码随想录算法训练营第五十二天【动态规划part13】 | 300.最长递增子序列、674. 最长连续递增序列、718. 最长重复子数组

300.最长递增子序列 题目链接 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 求解思路 动规五部曲 1.dp数组及其下标定义&#xff1a; dp[i]表示包括i以前的以nums[i]结尾的最长递增子序列的长度 2.状态转移方程&#xff1a; 位置i的最长升序…

一文带你了解网络安全简史

网络安全简史 1. 上古时代1.1 计算机病毒的理论原型1.2 早期计算机病毒1.3 主要特点 2. 黑客时代2.1 计算机病毒的大流行2.2 知名计算机病毒2.3 主要特点 3. 黑产时代3.1 网络威胁持续升级3.2 代表性事件3.3 主要特点 4 高级威胁时代4.1 高级威胁时代到来4.2 著名的APT组织4.3 …

【note: This is an issue with the package mentioned above, not pip.】

安装gym时出现问题&#xff0c;note: This is an issue with the package mentioned above, not pip. 报错原因&#xff1a; 缺失了某些依赖模块&#xff0c;所以安装报错。 Collecting package metadata (current_repodata.json): done Solving environment: failed with in…

使用 mtcnn 和 facenet 进行人脸识别

一、前言 人脸识别目前有比较多的应用了&#xff0c;比如门禁系统&#xff0c;手机的人脸解锁等等&#xff0c;今天&#xff0c;我们也来实现一个简单的人脸识别。 二、思维导图 三、详细步骤 3.1 准备 3.1.1 facenet 权重文件下载 下载地址&#xff1a;https://drive.goo…

高等数学中的近似计算

今天来总结一下同济版高数中有关近似计算的例子&#xff0c;总的来说是如下的三种&#xff0c;有遗漏的话可以在评论区指出~ 目录 一.微分在近似计算中的应用 二.全微分在近似计算中的应用 三.函数的幂级数展开在近似计算的应用 一.微分在近似计算中的应用 本质原理是&am…