RocketMQ单机部署完整学习笔记

文章目录

  • 前言
  • 一、RocketMQ是什么?
  • 二、使用步骤
    • 1.安装MQ
      • 1.安装JDK
      • 2.安装mq
      • 3.MQ配置(核心)
    • 2.搭建可视化dashboard
      • 1.下载源码
      • 2.修改配置
      • 3.启动
    • 3.整合java
      • 1.生产者
      • 2.消费者
      • 3.启动生产者
      • 4.启动消费者
      • 5.dashboard添加消费组
  • 三、总结
    • 全部的配置

前言

本文是基于4.X版本RocketMQ,从MQ的搭建,消息推送和消费以及dashboard的使用

一、RocketMQ是什么?

参考文档 https://rocketmq.apache.org/zh/docs/4.x/
重点角色如下

  • Producer:消息的发送者;举例:发信者
  • Consumer:消息接收者;举例:收信者
  • Broker:暂存和传输消息;举例:邮局
  • NameServer:管理Broker;举例:各个邮局的管理机构
  • Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息;举例
  • tag:消息标签,方便服务器过滤使用

二、使用步骤

1.安装MQ

首先安装jdk 再安装mq

1.安装JDK

  1. 查看Linux系统是否有自带的jdk
java -version

如果有 输入 rpm -qa | grep java 检测jdk的安装包
接着进行一个个删除包,输入:rpm -e --nodeps +包名
最后再次:rpm -qa | grep java检查是否删除完即可

  1. 下载jdk

https://www.oracle.com/java/technologies/downloads/#java8
资源连接地址(jdk和yarn)
https://download.csdn.net/download/HBliucheng/88696153
3. 上传jdk到linux服务器
在这里插入图片描述

  1. 解压jdk
tar -zvxf jdk-8u241-linux-x64..tar.gz
  1. 配置环境变量
    用vim /etc/profile进入编辑状态,加入下边这段配置
    JAVA_HOME 根据自己的解压路径来写
JAVA_HOME=/usr/local/jdk/jdk1.8.0_241
JRE_HOME=$JAVA_HOME/jre
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export JAVA_HOME JRE_HOME PATH CLASSPATH
  1. 重新加载配置
source /etc/profile
  1. 进行测试
java -version

在这里插入图片描述

2.安装mq

  1. 下载mq
    连接 https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip

  2. 解压上传
    我下载的是公司的mq是4.8的,官网链接给的是根4.9的,这个问题不大,不影响

目录结构如下
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

  1. 配置环境变量
vim /etc/profile
# 在末尾加入下面配置 路径和自己解压的mq路径一直 上一步有截图
export ROCKETMQ_HOME=/bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0
# 使环境变量生效
source /etc/profile

3.MQ配置(核心)

这一步很重要,配置完这里,那mq就算部署好了

  1. 修改runserver.sh
    默认配置比较大,修改启动大小
## cd /bin
vim runserver.sh
## 修改启动大小
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
  1. 启动服务nameserver
## 启动
nohup sh bin/mqnamesrv &
## 关闭
sh bin/mqshutdown namesrv

出现下面就算启动成功了
The Name Server boot success. serializeType=JSON
注意目录别进错了
在这里插入图片描述

  1. 指定NameServer地址
    相当于 broker注册到nameserver上
vim /etc/profile
# 在末尾加入下面配置 有多个时以分号隔开,这个是集群时使用的 mq端口默认是9876 
# 192.168.141.101是服务器地址
export NAMESRV_ADDR=192.168.141.101:9876
# 使环境变量生效
source /etc/profile
  1. 修改 runbroker.sh

修改启动参数

vim runbroker.sh
## 
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"
  1. 修改broker.conf
    重要,核心配置,以后关于mq服务的配置都在这里
    conf目录下
    2m-2s-async
    2m-2s-sync
    2m-noslave
    dledger
    这四个目录是集群配置时会用到,我们这里是单机的先不管
vim broker.confbrokerClusterName = DefaultCluster
brokerName = broker-a
#brokerid,0就表示是Master,>0的都是表示
brokerId = 0
# 这个就是第三三步配置的export NAMESRV_ADDR=192.168.141.101:9876 多个以分号分割
namesrvAddr=192.168.141.101:9876
#如果是多网卡的机器,比如云服务器,那么需要在broker.conf中增加brokerIP1属性,
#指定所在机器的外网网卡地址
brokerIP1=192.168.141.101
#对外服务的监听端口
listenPort=10911
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
traceTopicEnable=true
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true 

在这里插入图片描述
在这里插入图片描述

  1. 启动broker
    进入bin目录
    注意 -c 请加上,不加后面客户端使用时可能会出问题
#启动
nohup sh  bin/mqbroker -c conf/broker.conf &
# 关闭
sh bin/mqshutdown broker

查看nohup.out
出现这样的就说明启动成功了
The broker[broker-a, 192.168.141.101:10911] boot success. serializeType=JSON and name server is 192.168.141.101:9876

在这里插入图片描述

  1. 到此mq服务已启动完成
jps

在这里插入图片描述

  1. 查看日志
## 查看namesrv日志tailf /root/logs/rocketmqlogs/namesrv.log ## broker日志tailf /root/logs/rocketmqlogs/broker.log 

其实前面启动的时候查看这里的日志也可以看是否启动没
停掉时先停broker 再停namesrv 启动先启 namesrv 再启动broker 因为broker 需要注册到namesrv 上

  1. 发送和接收消息

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

在这里插入图片描述

在这里插入图片描述

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

在这里插入图片描述

2.搭建可视化dashboard

1.下载源码

点击下面直接下载4.X的
https://github.com/apache/rocketmq-dashboard/tree/release-1.0.0
说下
现在MQ已经到5.X了,但是现在还保留着4的,分支下拉到最后可以看到一个relaese-1.0.0
这个就是4.X用的,下载下来后解压
切记版本要对上,不然你和我一样折腾个一两天

在这里插入图片描述

2.修改配置

主要改下这个

rocketmq.config.namesrvAddr=192.168.141.101:9876

如果自己端口需要修改也可以,我是改成了8078
在这里插入图片描述

3.启动

在这里插入图片描述
访问 http://localhost:8078
在这里插入图片描述

3.整合java

1.生产者

直接上代码

package com.bsoft;import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.io.IOException;/*** @author:* @time: 2023/12/29 15:40*/
public class MQPublisher {private final static String nameServer = "192.168.141.101:9876";private final static String producerGroup = "my_group";private final static String consumerGroup = "my_group";private final static String topic = "topic_test";public static void main(String[] args) throws IOException {// 初始化一个producer并设置Producer group nameDefaultMQProducer producer = new DefaultMQProducer(producerGroup);try {// 设置NameServer地址producer.setNamesrvAddr(nameServer);// 启动producerproducer.start();// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤Message msg = new Message(topic, "tagB", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));// 异步发送消息, 发送结果通过callback返回给客户端producer.send(msg, new SendCallback() {public void onSuccess(SendResult sendResult) {System.out.printf("OK %s %n",sendResult.getMsgId());}public void onException(Throwable e) {System.out.printf("Exception %s %n", e);e.printStackTrace();}},10000);} catch (Exception e) {e.printStackTrace();}System.in.read();}
}

2.消费者

package com.bsoft;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;/*** @author: liucheng* @time: 2023/12/29 15:39*/
public class MQConsumer {private final static String nameServer = "192.168.141.101:9876";private final static String consumerGroup = "my_group_test";private final static String topic = "topic_test";public static void main(String[] args) throws MQClientException, IOException, InterruptedException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup,true);// 设置NameServer的地址consumer.setNamesrvAddr(nameServer);// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息consumer.subscribe(topic, "tagE");// 注册回调实现类来处理从broker拉取回来的消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);msgs.forEach((msg)->{byte[] body = msg.getBody();String s = new String(body, Charset.defaultCharset());System.out.println("msg=================> " +s);});// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();System.out.printf("Consumer Started......");
//        Thread.sleep(5000);
//        consumer.shutdown();System.in.read();}
}

3.启动生产者

在这里插入图片描述

在这里插入图片描述

查看详情
在这里插入图片描述

4.启动消费者

package com.bsoft;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;/*** @author: liucheng* @time: 2023/12/29 15:39*/
public class MQConsumer {private final static String nameServer = "192.168.141.101:9876";private final static String consumerGroup = "my_group_test";private final static String topic = "topic_test";public static void main(String[] args) throws MQClientException, IOException, InterruptedException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);// 设置NameServer的地址consumer.setNamesrvAddr(nameServer);// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息consumer.subscribe(topic, "tagA");// 注册回调实现类来处理从broker拉取回来的消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);msgs.forEach((msg)->{byte[] body = msg.getBody();String s = new String(body, Charset.defaultCharset());System.out.println("msg=================> " +s);});// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();System.out.printf("Consumer Started......");
//        Thread.sleep(5000);
//        consumer.shutdown();System.in.read();}
}

在这里插入图片描述
没日志打印,但是又显示消费了
在这里插入图片描述

5.dashboard添加消费组

没查询到那添加一个
在这里插入图片描述
在这里插入图片描述
重启生产者发现可以消费

对于消费组还是得在dashboard创建好了再去写代码,有的说能够改配置能否直接创建,试过了,没生效,先这样吧,有好的方法或搭建过程中遇到什么问题可以私聊我,看到及时回答

三、总结

整个搭建过程踩了不少坑,比如
版本的不一致导致部分功能一直报错;
启动brocker时未指定实例文件没有加-c来启动导致部署失败;消
费组未在dashboard创建时代码中不显示消费信息
关于5.x无法打包的问题是因为缺少yarn-1.22.10.tar.gz 这个已经上传到jdk那个资源下了,把这个复制到
{path}\maven\repository\com\github\eirslett\yarn\1.22.10再打包即可

全部的配置

#  cat /etc/profileJAVA_HOME=/usr/local/jdk/jdk1.8.0_241
JRE_HOME=$JAVA_HOME/jre
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export JAVA_HOME JRE_HOME PATH CLASSPATH
export ROCKETMQ_HOME=/bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0
export NAMESRV_ADDR=192.168.141.101:9876
#  cat runbroker.sh JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"
# cat runserver.sh 
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"# cat broker.conf 
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
namesrvAddr=192.168.141.101:9876
brokerIP1=192.168.141.101
listenPort=10911
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
traceTopicEnable=true
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/595512.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

静态网页设计——电影角(HTML+CSS+JavaScript)

前言 声明&#xff1a;该文章只是做技术分享&#xff0c;若侵权请联系我删除。&#xff01;&#xff01; 使用技术&#xff1a;HTMLCSSJS 主要内容&#xff1a;本网页主要利用HTML语言编写&#xff0c;简要介绍世界上一些主要国家&#xff0c;例如&#xff0c;中&#xff0c;…

【已解决】You have an error in your SQL syntax

报错讯息 java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ‘desc,target_url,sort,status,create_by,modify_by,created,last_update_time FROM…

跑通CLAM

项目场景&#xff1a; 从github上下载CLAM代码&#xff0c;上传Camelyon-16中的部分WSI图像&#xff0c;将代码跑通。 CLAM项目地址&#xff1a; GitHub - mahmoodlab/CLAM: Data-efficient and weakly supervised computational pathology on whole slide images - Nature …

高级RAG(四):RAGAs评估

之前我完成了父文档检索器和llamaIndex从小到大的检索这两篇博客&#xff0c;我在这两篇博客中分别介绍了使用langchain和llamaIndex进行文档检索的方法和步骤&#xff0c;其中包含了不同的RAG的检索策略&#xff0c;通常来说一个典型的RAG系统一般包含两个主要的部件&#xff…

程序媛的mac修炼手册--MacOS系统更新升级史

啊&#xff0c;我这个口罩三年从未感染过新冠的天选免疫王&#xff0c;却被支原体击倒&#x1f637;大意了&#xff0c;前几天去医院体检&#xff0c;刚检查完出医院就摘口罩了&#x1f926;大伙儿还是要注意戴口罩&#xff0c;保重身体啊&#xff01;身体欠恙&#xff0c;就闲…

‘react-native‘ 不是内部或外部命令,也不是可运行的程序或批处理文件。

原因&#xff1a;没有下载react-native 解决下载react-native npm i -g react-native-cli

向日葵远程工具的使用Mysql5.7的安装与配置

目录 一、向日葵远程安装与使用 二、Mysql 5.7 安装与配置 2.1 安装 2.2 Navicat Premium 12 测试连接 本机测试连接 外部访问MySQL测试连接 三、思维导图 一、向日葵远程安装与使用 简介&#xff1a; 向日葵远程控制是一款用于对远程PC进行管理和服务的软件,拥有5秒快速…

uniapp从入门到精通(全网保姆式教程)~ 别再说你不会开发小程序了

目录 一、介绍 二、环境搭建&#xff08;hello world&#xff09; 2.1 下载HBuilderX 2.2 下载微信开发者工具 2.3 创建uniapp项目 2.4 在浏览器运行 2.5 在微信开发者工具运行 2.6 在手机上运行 三、项目基本目录结构 四、开发规范概述 五、全局配置文件&#xff0…

移动神器RAX3000M路由器变身家庭云之四:开放LuCI管理界面,网站服务

前面已经改造成了家庭云供外网访问了。由于这个路由本来就是openwrt&#xff0c;openwrt本身的管理界面LuCI-admin很好用&#xff0c;但被屏蔽了&#xff0c;需要打开。 打开界面 ssh登录路由器&#xff0c;修改 /etc/config/uhttpd配置文件如下&#xff1a; config uhttpd …

【JUC】Synchronized及JVM底层原理

Synchronized使用方式 Synchronized有三种应用方式 作用于实例方法&#xff0c;当前示实例加锁进入同步代码前要获得当前实例的锁&#xff0c;即synchronized普通同步方法&#xff0c;调用指令将会检查方法的ACC_SYNCHRONIZED访问标志是否被设置。 如果设置了&#xff0c;执行…

金融中IC和IR的定义

当谈到金融领域时&#xff0c;IC&#xff08;Information Coefficient&#xff09;和IR&#xff08;Information Ratio&#xff09;通常是用来评估投资组合管理绩效的指标。它们都涉及到投资者对信息的利用和管理的效果。 信息系数&#xff08;IC - Information Coefficient&a…

基于ssm毕业设计选题系统论文

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本毕业设计选题系统就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数据信息…

软文推广宣发遵循的基本流程

在软文发稿的旅程中&#xff0c;制定明确的策略思路是确保成功的关键。软文发稿有一定的流程需要我们遵循&#xff0c;才能达到理想的软文宣发效果。首先&#xff0c;我们要明确发稿的地区&#xff0c;然后精准选择目标受众&#xff0c;最后才能展开内容的创作。下面&#xff0…

实验笔记之——下载数据到服务器

开发过程中经常需要把数据传到服务器上&#xff0c;太麻烦了&#xff0c;为此本博文记录采用百度云来传输数据 百度云 使用bypy包。 安装&#xff1a;pip install bypy 配置bypy连接百度网盘&#xff1a; 终端输入bypy info将命令行提示的链接复制到浏览器&#xff0c;并复制…

一文读懂$mash 通证的 “Fair Launch” 规则,将公平发挥极致

Solmash 是Solana生态中由社区主导的铭文资产LaunchPad平台&#xff0c;该平台旨在为Solana原生铭文项目&#xff0c;以及通过其合作伙伴SoBit跨链桥桥接到Solana的Bitcoin生态铭文项目提供更广泛的启动机会。有了Solmash&#xff0c;将会有更多的Solana生态的铭文项目、资产通…

工业物联网上篇——什么是IIOT?

工业物联网背后的理念是使用工业设施中“哑巴设备”多年来产生的数据。装配线上的智能机器不仅可以更快地捕获和分析数据&#xff0c;且在交流重要信息方面也更快&#xff0c;这有助于更快、更准确地做出业务决策。 信息技术&#xff08;IT&#xff09;和运营技术&#xff08;O…

MySQL取出N列里最大or最小的一个数据

如题&#xff0c;现在有3列&#xff0c;都是数字类型&#xff0c;要取出这3列里最大或最小的的一个数字 -- N列取最小 SELECT LEAST(temperature_a,temperature_b,temperature_c) min FROM infrared_heat-- N列取最大 SELECT GREATEST(temperature_a,temperature_b,temperat…

Hadoop之MapReduce 详细教程

MapReduce仅作了解&#xff0c;生产上很少使用该计算程序 1、MapReduce介绍 MapReduce 思想在生活中处处可见。或多或少都曾接触过这种思想。MapReduce的思想核心是“分而治之”&#xff0c;适用于大量复杂的任务处理场景&#xff08;大规模数据处理场景&#xff09;。即使是…

Centos 磁盘挂载和磁盘扩容(新加硬盘方式)

步骤总结如下 一、对磁盘进行分区 二、对磁盘进行格式化 三、将磁盘挂载到对应目录 四、做开机自动挂载磁盘 磁盘分区 1.使用命令&#xff1a;fdisk -l 查看磁盘&#xff08;注&#xff1a;正常在Centos7中第一块数据盘标识一般是/dev/sda,第二块数据盘标识一般是/dev/sdb&…

WebGIS开发的常见框架及优缺点

WebGIS开发引擎的发展历程&#xff1a; 内容来自公众号&#xff1a;Spatial Data 地图API分类 WebGIS系统通常都围绕地图进行内容表达&#xff0c;但并不是有地图就一定是WebGIS&#xff0c;所以下面要讨论下基于Web的地图API分类及应用场景。Web上的Map API主要分类&#xff…