RocketMQ保姆级教程

RocketMQ是阿里巴巴旗下一款开源的MQ框架,经历过双十一考验、Java编程语言实现,有非常好完整生态系统。RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等,总之就是葛大爷的一句话

图片

整篇文章可以大致分为三个部分,第一部分属于一些核心概念和工作流程的讲解;第二部分就是纯手动搭建了一套环境;第三部分是基于环境进行测试和集成到SpringBoot,因为整个过程讲的比较细,所以我称之为“保姆级教程”。

好了,废话补多少,直接进入主题。

核心概念

  • NameServer:可以理解为是一个注册中心,主要是用来保存topic路由信息,管理Broker。在NameServer的集群中,NameServer与NameServer之间是没有任何通信的。

  • Broker:核心的一个角色,主要是用来保存topic的信息,接受生产者产生的消息,持久化消息。在一个Broker集群中,相同的BrokerName可以称为一个Broker组,一个Broker组中,BrokerId为0的为主节点,其它的为从节点。BrokerName和BrokerId是可以在Broker启动时通过配置文件配置的。每个Broker组只存放一部分消息。

  • 生产者:生产消息的一方就是生产者

  • 生产者组:一个生产者组可以有很多生产者,只需要在创建生产者的时候指定生产者组,那么这个生产者就在那个生产者组

  • 消费者:用来消费生产者消息的一方

  • 消费者组:跟生产者一样,每个消费者都有所在的消费者组,一个消费者组可以有很多的消费者,不同的消费者组消费消息是互不影响的。

  • topic(主题):可以理解为一个消息的集合的名字,生产者在发送消息的时候需要指定发到哪个topic下,消费者消费消息的时候也需要知道自己消费的是哪些topic底下的消息。

  • Tag(子主题):比topic低一级,可以用来区分同一topic下的不同业务类型的消息,发送消息的时候也需要指定。

这里有组的概念是因为可以用来做到不同的生产者组或者消费者组有不同的配置,这样就可以使得生产者或者消费者更加灵活。

工作流程

说完核心概念,再来说一下核心的工作流程,这里我先画了一张图。

图片

通过这张图就可以很清楚的知道,RocketMQ大致的工作流程:

  • Broker启动的时候,会往每台NameServer(因为NameServer之间不通信,所以每台都得注册)注册自己的信息,这些信息包括自己的ip和端口号,自己这台Broker有哪些topic等信息。

  • Producer在启动之后会跟会NameServer建立连接,定期从NameServer中获取Broker的信息,当发送消息的时候,会根据消息需要发送到哪个topic去找对应的Broker地址,如果有的话,就向这台Broker发送请求;没有找到的话,就看根据是否允许自动创建topic来决定是否发送消息。

  • Broker在接收到Producer的消息之后,会将消息存起来,持久化,如果有从节点的话,也会主动同步给从节点,实现数据的备份

  • Consumer启动之后也会跟会NameServer建立连接,定期从NameServer中获取Broker和对应topic的信息,然后根据自己需要订阅的topic信息找到对应的Broker的地址,然后跟Broker建立连接,获取消息,进行消费

就跟上面的图一样,整体的工作流程还是比较简单的,这里我简化了很多概念,主要是为了好理解。

环境搭建

终于讲完了一些简单的概念,接下来就来搭建一套RocketMQ的环境。

通过上面分析,我们知道,在RocketMQ中有NameServer、Broker、生产者、消费者四种角色。而生产者和消费者实际上就是业务系统,所以这里不需要搭建,真正要搭建的就是NameServer和Broker,但是为了方便RocketMQ数据的可视化,这里我多搭建一套可视化的服务。

搭建过程比较简单,按照步骤一步一步来就可以完成,如果提示一些命令不存在,那么直接通过yum安装这些命令就行。

一、准备

需要准备一个linux服务器,需要先安装好JDK

关闭防火墙

systemctl stop firewalld
systemctl disable firewalld
下载并解压RocketMQ
1、创建一个目录,用来存放rocketmq相关的东西
mkdir /usr/rocketmq
cd /usr/rocketmq
2、下载并解压rocketmq

下载

wget https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip

解压

unzip rocketmq-all-4.7.1-bin-release.zip

看到这一个文件夹就完成了

图片

然后进入rocketmq-all-4.7.1-bin-release文件夹

cd rocketmq-all-4.7.1-bin-release

RocketMQ的东西都在这了

图片

二、搭建NameServer

修改jvm参数

在启动NameServer之前,强烈建议修改一下启动时的jvm参数,因为默认的参数都比较大,为了避免内存不够,建议修改小,当然,如果你的内存足够大,可以忽略。

vi bin/runserver.sh

修改画圈的这一行

图片

这里你可以直接修改成跟我一样的

-server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=32m -XX:MaxMetaspaceSize=50m
启动NameServer

修改完之后,执行如下命令就可以启动NameServer了

nohup sh bin/mqnamesrv &

查看NameServer日志

tail -f ~/logs/rocketmqlogs/namesrv.log

如果看到如下的日志,就说明启动成功了

图片

NameServer日志

三、搭建Broker

这里启动单机版的Broker

修改jvm参数

跟启动NameServer一样,也建议去修改jvm参数

vi bin/runbroker.sh

将画圈的地方设置小点,当然也别太小啊

图片

当然你还是可以跟我设置的一样

-server -Xms1g -Xmx1g -Xmn512m
修改Broker配置文件broker.conf

这里需要改一下Broker配置文件,需要指定NameServer的地址,因为需要Broker需要往NameServer注册

vi conf/broker.conf

Broker配置文件

图片

Broker配置文件

这里就能看出Broker的配置了,什么Broker集群的名称啊,Broker的名称啊,Broker的id啊,都跟前面说的对上了。

在文件末尾追加地址

namesrvAddr = localhost:9876

因为NameServer跟Broker在同一台机器,所以是localhost,NameServer端口默认的是9876。

不过这里我还建议再修改一处信息,因为Broker向NameServer进行注册的时候,带过去的ip如果不指定就会自动获取,但是自动获取的有个坑,就是有可能你的电脑无法访问到这个自动获取的ip,所以我建议手动指定你的电脑可以访问到的服务器ip。

我的虚拟机的ip是192.168.200.143,所以就指定为192.168.200.143,如下

brokerIP1 = 192.168.200.143
brokerIP2 = 192.168.200.143

如果以上都配置的话,最终的配置文件应该如下,红圈的为新加的

图片

启动Broker
nohup sh bin/mqbroker -c conf/broker.conf &

-c 参数就是指定配置文件

查看日志

tail -f ~/logs/rocketmqlogs/broker.log

当看到如下日志就说明启动成功了

图片

四、搭建可视化控制台

其实前面NameServer和Broker搭建完成之后,就可以用来收发消息了,但是为了更加直观,可以搭一套可视化的服务。

可视化服务其实就是一个jar包,启动就行了。

jar包可以从这获取

链接:https://pan.baidu.com/s/16s1qwY2qzE2bxR81t5Wm6w
提取码:s0sd

将jar包上传到服务器,放到/usr/rocketmq的目录底下,当然放哪都无所谓,这里只是为了方便,因为rocketmq的东西都在这里

然后进入/usr/rocketmq下,执行如下命名

nohup java -jar -server -Xms256m -Xmx256m -Drocketmq.config.namesrvAddr=localhost:9876 -Dserver.port=8088 rocketmq-console-ng-1.0.1.jar &

rocketmq.config.namesrvAddr就是用来指定NameServer的地址的

查看日志

tail -f ~/logs/consolelogs/rocketmq-console.log

当看到如下日志,就说明启动成功了

图片

然后在浏览器中输入http://linux服务器的ip:8088/就可以看到控制台了,如果无法访问,可以看看防火墙有没有关闭

图片

右上角可以把语言切换成中文

图片

Broker集群信息

图片

topic信息

通过控制台可以查看生产者、消费者、Broker集群等信息,非常直观。

功能很多,这里就不一一介绍了。

测试

环境搭好之后,就可以进行测试了。

引入依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.7.1</version>
</dependency>
生产者发送消息
public class Producer {public static void main(String[] args) throws Exception {//创建一个生产者,指定生产者组为sanyouProducerDefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");// 指定NameServer的地址producer.setNamesrvAddr("192.168.200.143:9876");// 第一次发送可能会超时,我设置的比较大producer.setSendMsgTimeout(60000);// 启动生产者producer.start();// 创建一条消息// topic为 sanyouTopic// 消息内容为 三友的java日记// tags 为 TagAMessage msg = new Message("sanyouTopic", "TagA", "三友的java日记 ".getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息并得到消息的发送结果,然后打印SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);// 关闭生产者producer.shutdown();}}
  • 构建一个消息生产者DefaultMQProducer实例,然后指定生产者组为sanyouProducer;

  • 指定NameServer的地址:服务器的ip:9876,因为需要从NameServer拉取Broker的信息

  • producer.start() 启动生产者

  • 构建一个内容为三友的java日记的消息,然后指定这个消息往sanyouTopic这个topic发送

  • producer.send(msg):发送消息,打印结果

  • 关闭生产者

运行结果如下

SendResult [sendStatus=SEND_OK, msgId=C0A81FAF54F818B4AAC2475FD2010000, offsetMsgId=C0A8C88F00002A9F000000000009AE55, messageQueue=MessageQueue [topic=sanyouTopic, brokerName=broker-a, queueId=0], queueOffset=0]

sendStatus=SEND_OK 说明发送成功了,此时就能后控制台看到未消费的消息了。

图片

到控制台看到消息那块,然后选定发送的topic,查询的时间范围手动再选一下,不选就查不出来(我怀疑这是个bug),然后查询就能看到了一条消息。

然后点击一下MESSAGE DETAIL就能够看到详情。

图片

这里就能看到发送消息的详细信息。

左下角消息的消费的消费,因为我们还没有消费者订阅这个topic,所以左下角没数据。

消费者消费消息
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {// 通过push模式消费消息,指定消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sanyouConsumer");// 指定NameServer的地址consumer.setNamesrvAddr("192.168.200.143:9876");// 订阅这个topic下的所有的消息consumer.subscribe("sanyouTopic", "*");// 注册一个消费的监听器,当有消息的时候,会回调这个监听器来消费消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.printf("消费消息:%s", new String(msg.getBody()) + "\n");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();System.out.printf("Consumer Started.%n");}
}
  • 创建一个消费者实例对象,指定消费者组为sanyouConsumer

  • 指定NameServer的地址:服务器的ip:9876

  • 订阅 sanyouTopic 这个topic的所有信息

  • consumer.registerMessageListener ,这个很重要,是注册一个监听器,这个监听器是当有消息的时候就会回调这个监听器,处理消息,所以需要用户实现这个接口,然后处理消息。

  • 启动消费者

启动之后,消费者就会消费刚才生产者发送的消息,于是控制台就打印出如下信息

Consumer Started.
消费消息:三友的java日记 

此时再去看控制台

图片

发现被sanyouConsumer这个消费者组给消费了。

SpringBoot环境下集成RocketMQ

集成

在实际项目中肯定不会像上面测试那样用,都是集成SpringBoot的。

1、引入依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><version>2.1.1.RELEASE</version>
</dependency>
2、yml配置
rocketmq:producer:group: sanyouProducername-server: 192.168.200.143:9876
3、创建消费者

SpringBoot底下只需要实现RocketMQListener接口,然后加上@RocketMQMessageListener注解即可

@Component
@RocketMQMessageListener(consumerGroup = "sanyouConsumer", topic = "sanyouTopic")
public class SanYouTopicListener implements RocketMQListener<String> {@Overridepublic void onMessage(String msg) {System.out.println("处理消息:" + msg);}}

@RocketMQMessageListener需要指定消费者属于哪个消费者组,消费哪个topic,NameServer的地址已经通过yml配置文件配置类

4、测试
@SpringBootTest(classes = RocketMQApplication.class)
@RunWith(SpringRunner.class)
public class RocketMQTest {@Autowiredprivate RocketMQTemplate template;@Testpublic void send() throws InterruptedException {template.convertAndSend("sanyouTopic", "三友的java日记");Thread.sleep(60000);}}

直接注入一个RocketMQTemplate,然后通过RocketMQTemplate发送消息。

运行结果如下:

处理消息:三友的java日记

的确消费到消息了。

原理

其实原理是一样的,只不过在SpringBoot中给封装了一层,让使用起来更加简单。

1、RocketMQTemplate构造代码

图片

所以从这可以看出,最终在构造RocketMQTemplate的时候,传入了一个DefaultMQProducer,所以可想而知,最终RocketMQTemplate发送消息也是通过DefaultMQProducer发送的。

2、@RocketMQMessageListener 注解处理

图片

从这可以看出,会为每一个加了@RocketMQMessageListener注解的对象创建一个DefaultMQPushConsumer,所以最终也是通过DefaultMQPushConsumer消费消息的。

至于监听器,是在这

图片

遍历每条消息,然后调用handleMessage,最终会调用实现了RocketMQListener的对象处理消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/163151.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

为Oracle链接服务器使用分布式事务

1 现象 在SQL Server中创建指向Oracle的链接服务器&#xff0c;SQL语句在事务中向链接服务器插入数据。返回链接服务器无法启动分布式事务的报错。 2 解决 在Windows平台下&#xff0c;SQL Server依赖分布式事务协调器&#xff08;MSDTC&#xff09;来使用分布式事务&#xff0…

关于APP备案的通知以及APP备案的常见问题

前言 众所周知今年8月份&#xff0c;工信部出台了《工业和信息化部关于开展移动互联网应用程序备案工作的通知》&#xff0c;APP开发者的影晌是显而易见的。开发者需要按照要求提交相关材料进行备案&#xff0c;这无疑增加了开发者的时间和精力成本。虽然备案制度会增加开发者…

深度学习之基于Tensorflow卷积神经网络鸟类目标识别检测系统

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 基于Tensorflow的卷积神经网络&#xff08;Convolutional Neural Networks&#xff0c;CNN&#xff09;在鸟类目标识…

MAX/MSP SDK学习06:内存管理

提供两种内存分配方式&#xff1a;①简单指针&#xff0c;②句柄&#xff08;二级指针&#xff09;&#xff1b;官方文档建议使用前者。 // 简单指针 char *ptr; ptr sysmem_newptr(2000); post("I have a pointer %lx and it is %ld bytes in size",ptr, sysmem_p…

opencv-分水岭算法分割

原理 任何一副灰度图像都可以被看成拓扑平面&#xff0c;灰度值高的区域可以被看成是山峰&#xff0c;灰度值低的区域可以被看成是山谷。我们向每一个山谷中灌不同颜色的水。随着水的位的升高&#xff0c;不同山谷的水就会相遇汇合&#xff0c;为了防止不同山谷的水汇合&#x…

ios(swiftui) 画中画

一、环境 要实现画中画 ios系统必须是 iOS14 本文开发环境 xcode14.2 二、权限配置 在项目导航器中单击项目&#xff0c;然后单击Signing & Capabilities。单击 Capabilit搜索Background Modes&#xff0c;然后双击将其添加为功能。在新添加的Background Modes部分&a…

Pyqt5实现多线程程序

主从架构 Pyqt常常使用**主从架构&#xff08;Master-Workers 架构&#xff09;**来避免界面卡死的情况。 Master-Workers 架构就像它的名字&#xff0c;一个master统领着几个workers一起干活。其中某个worker倒下了不会导致整体任务失败。matser不用干活&#xff0c;因此可以…

分布式锁之基于redis实现分布式锁(二)

2. 基于redis实现分布式锁 2.1. 基本实现 借助于redis中的命令setnx(key, value)&#xff0c;key不存在就新增&#xff0c;存在就什么都不做。同时有多个客户端发送setnx命令&#xff0c;只有一个客户端可以成功&#xff0c;返回1&#xff08;true&#xff09;&#xff1b;其他…

市场是变化的?这种悖论fpmarkets澳福一秒打破

你是不是始终认为市场是经常变化的&#xff0c;其实这是不对的&#xff0c;这种认识fpmarkets澳福今天一秒打破。 市场经常变化吗?众多投资者无需过多思考&#xff0c;就认为答案是肯定的。因为无论是在互联网的哪个角落&#xff0c;都可以看到这样的信息。即使我们没有深入研…

Python---函数的嵌套(一个函数里面又调用了另外一个函数)详解

函数嵌套调用------就是一个函数里面又调用了另外一个函数。 基本语法&#xff1a; # 定义 函数B def funcB():print(这是funcB函数的函数体部分...)# 定义 函数A def funcA():print(- * 80) # 这一行为了更好区分print(这是funcA函数的函数体部分...)# 假设我们在调用funcA…

Ubuntu18 Opencv3.4.12 viz 3D显示安装、编译、使用、移植

Opencv3.*主模块默认包括两个3D库 calib3d用于相机校准和三维重建 &#xff0c;viz用于三维图像显示&#xff0c;其中viz是cmake选配。 参考&#xff1a; https://docs.opencv.org/3.4.12/index.html 下载linux版本的源码 sources。 查看cmake apt list --installed | grep…

App Cleaner Uninstaller Pro 一键清理,彻底卸载Mac应用

随着科技的不断发展&#xff0c;Mac电脑已经成为许多用户工作和娱乐的首选。然而&#xff0c;随着时间的推移&#xff0c;我们的Mac电脑上可能会堆积大量的无效文件和冗余数据&#xff0c;这不仅占用了宝贵的磁盘空间&#xff0c;还可能影响到系统的运行速度。为了解决这一问题…

基于51单片机zigbee温室大棚监控系统

**单片机设计介绍&#xff0c;基于51单片机zigbee温室大棚监控系统 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于51单片机和Zigbee技术的温室大棚监控系统是一种用于监测和控制温室大棚环境的设备。以下是一个基本的设计介…

STM32 CAN通信自定义数据包多帧连发乱序问题

场景&#xff1a; can标准帧中每一帧只能传输8字节&#xff0c;而应用中传输一包的内容往往超过8字节&#xff0c;因此需要把一个包拆成多个帧发送&#xff0c;接收端才把收到的多帧重新组装成一个完整的包 问题描述 在一问一答的两块板间通信&#xff0c;多帧连发是能够按照…

信创系列之大数据,分布式数据库产业链跟踪梳理笔记…

并购优塾 投行界的大叔&#xff0c;大叔界的投行 【产业链地图&#xff0c;版权、内容与免责声明】1&#xff09;版权&#xff1a;版权所有&#xff0c;违者必究&#xff0c;未经许可不得翻版、摘编、拷贝、复制、传播。2&#xff09;尊重原创&#xff1a;如有引用未标注来源…

CentOS 7启动时报“Started Crash recovery kernel arming.....shutdown....”问题处理过程

有台虚拟机由于CPU负载过高而宕机&#xff0c;宕机重启后停在“Started Crash recovery kernel arming…shutdown…”阶段&#xff0c;如下所示&#xff1a; 重置虚拟机&#xff0c;进入grub菜单&#xff0c;按e编辑启动选项&#xff0c;在linux16 行末&#xff0c;加上&…

【考研】数据结构(更新到循环链表)

声明&#xff1a;所有代码都可以运行&#xff0c;可以直接粘贴运行&#xff08;只有库函数没有声明&#xff09; 线性表的定义和基本操作 基本操作 定义 静态&#xff1a; #include<stdio.h> #include<stdlib.h>#define MaxSize 10//静态 typedef struct{int d…

【追求卓越02】数据结构--链表

引导 今天我们进入链表的学习&#xff0c;我相信大家对链表都很熟悉。链表和数组一样&#xff0c;作为最基础的数据结构。在我们的工作中常常会使用到。但是我们真的了解到数组和链表的区别吗&#xff1f;什么时候使用数组&#xff0c;什么时候使用链表&#xff0c;能够正确的选…

监控员工上网有什么软件

监控员工上网的软件主要用于监控员工在工作时间内的网络行为&#xff0c;包括浏览网页、使用社交媒体、发送邮件等。通过监控员工上网行为&#xff0c;企业管理者可以更好地了解员工的工作状态和行为&#xff0c;规范员工的上网行为&#xff0c;提高工作效率&#xff0c;同时也…

SSL证书对网站的作用及影响?

SSL证书作为当下互联网的重要安全件&#xff0c;包括搜索引擎的收录、网站是否具备信任的条件以及HTTP2.0传输协议的相互作用等&#xff0c;尤其是浏览器对古老的http协议警告提示不安全将直接影响到用户的信任度以及品牌形象&#xff0c;对于网站来说可谓是必不可少。 SSL证书…