RocketMQ5-03RocketMQ-Dashboard和Java客户端访问示例

接上篇02快速部署RocketMQ5.x(手动和容器部署)
已经完成 RocketMQ5.0 环境的部署,就需要对这个环境进行测试,查看集群、写入消息、读取消息等

本篇教你如何使用和查看部署的服务:

  • Docker部署 Dashboard
    • 获取镜像并下载
    • 部署服务
  • 客户端连接
    • pom文件
    • 生产者代码
    • 消费者代码
    • 接口测试
    • 问题: broker资源不足无法提供服务

Docker部署 Dashboard

以上通过可执行文件部署或者容器部署的形式,都需要有一个可以查看的集群的地方,对于官方自己配备的有 rocketmq-dashboard, 可以使用docker快速部署,便于测试

获取镜像并下载

docker search rocketmq-dashboard & docker pull apacherocketmq/rocketmq-dashboard

部署服务

docker run -d --name rmqdashboard -e "JAVA_OPTS=-Xmx256M -Xms256M -Xmn128M -Drocketmq.namesrv.addr=192.168.2.92:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8088:8080 apacherocketmq/rocketmq-dashboard

这边将端口映射到了8088,所以访问 localhost:8088,就可以查看到集群,如果有数据正在写入与读取,就能够大概看到数据量
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

客户端连接

手动创建 topic: sh bin/mqadmin updatetopic -n 192.168.2.92:9876 -t dataTopic2 -c DefaultCluster

pom文件

 <properties><java.version>17</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>3.0.2</spring-boot.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.5</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>17</source><target>17</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring-boot.version}</version><configuration><mainClass>com.learning.springbootrmq5.SpringbootRmq5Application</mainClass><skip>true</skip></configuration><executions><execution><id>repackage</id><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build>

生产者代码

 @GetMapping("/sendSync")public String sendSync() throws ClientException, IOException {String endpoint = "192.168.2.92:8081";String topic = "dataTopic2";ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);ClientConfiguration configuration = builder.enableSsl(true).build();Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build();Message message = provider.newMessageBuilder().setTopic(topic).setKeys("messageKey").setTag("messageTag").setBody("messageBodySync".getBytes()).build();try {SendReceipt sendReceipt = producer.send(message);log.info("Send sync message successfully, messageId={}", sendReceipt.getMessageId());} catch (ClientException e) {log.error("Failed to send message", e);}producer.close();return "success";}@GetMapping("/sendAsync")public String sendAsync() throws ClientException, InterruptedException, IOException {String endpoint = "192.168.2.92:8081";String topic = "dataTopic2";ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().enableSsl(true).setEndpoints(endpoint);ClientConfiguration configuration = builder.build();Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build();Message message = provider.newMessageBuilder().setTopic(topic).setKeys("messageKey").setTag("messageTag").setBody("messageBodyASync".getBytes()).build();producer.sendAsync(message);log.info("Send async message successfully, messageId");return "success";}

消费者代码

@Slf4j
@Component
public class MessageConsumerRunner implements CommandLineRunner {@Overridepublic void run(final String... args) throws Exception {final ClientServiceProvider provider = ClientServiceProvider.loadService();String endpoints = "192.168.2.92:8081";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();String tag = "*";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);String consumerGroup = "YourConsumerGroup";String topic = "dataTopic2";PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup(consumerGroup).setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)).setMessageListener(messageView -> {log.info("Consume message successfully, messageId={}", messageView.getMessageId());return ConsumeResult.SUCCESS;}).build();Thread.sleep(Long.MAX_VALUE);}
}

接口测试

请求接口 /msg/sendAsync
在这里插入图片描述

能够正常收发

问题: broker资源不足无法提供服务

可能出现的客户端报错为:

org.apache.rocketmq.client.java.exception.InternalErrorException: [request-id=e3f9dxxxx1aa872, response-code=50001] org.apache.rocketmq.proxy.common.ProxyException: service not available now. It may be caused by one of the following reasons: the broker's disk is full [CL:  0.96 CQ:  0.96 INDEX: -1.00], messages are put to the slave, message store has been shut down, etc.java.util.concurrent.RejectedExecutionException: Task org.apache.rocketmq.shaded.io.grpc.internal.DelayedStream$4@72ba34c2 rejected from java.util.concurrent.ThreadPoolExecutor@7deb0119[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 13]at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065) ~[na:na]at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833) ~[na:na]at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365) ~[na:na]

以上大体就是描述资源不足无法进行接入、服务不可达等,通常就是因为环境的资源不足,可能是内存、可能是硬盘

.../broker/logs/rocketmqlogs/store.log 中可以看出端倪,是磁盘存储不够了

2024-01-08 13:34:24 ERROR StoreScheduledThread1 - physic disk maybe full soon 0.95, so mark disk full, storePathPhysic=/home/rocketmq/store/commitlog

可以通过清除以下数据暂时缓解 .../broker/store/commitlog,可以发现没怎么用也有好多G。不过确实需要使用的话尽早考虑扩容啊

扩大存储增加可用磁盘空间,就能够正常使用连接了

如果这篇文章对你有用的话,帮忙留个关注吧~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/608788.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

哈希-力扣01两数之和

题目 给定一个整数数组 nums 和一个整数目标值 target&#xff0c;请你在该数组中找出 和为目标值 target 的那 两个 整数&#xff0c;并返回它们的数组下标。 你可以假设每种输入只会对应一个答案。但是&#xff0c;数组中同一个元素在答案里不能重复出现。 你可以按任意顺…

spring boot 2升级为spring boot 3中数据库连接池druid的问题

目录 ConfigurationClassPostProcessor ConfigurationClassBeanDefinitionReader MybatisPlusAutoConfiguration ConditionEvaluator OnBeanCondition 总结 近期给了一个任务&#xff0c;要求是对现有的 spring boot 2.x 项目进行升级&#xff0c;由于 spring boot 2.x 版…

Linux Capabilities 进阶实战

目录 1. 快速回顾 2. 为可执行文件分配 capabilities 3. 构建半特权环境 4. 容器与 capabilities Linux Capabilities 基础概念与基本使用 上一篇学习了LinuxCapabilities的基础知识和基本使用&#xff0c;因为后面需要学习Docker的逃逸&#xff0c;理解Linux Capabilitie…

忆阻器芯片STELLAR权重更新算法(清华大学吴华强课题组)

参考文献&#xff08;清华大学吴华强课题组&#xff09; Zhang, Wenbin, et al. “Edge learning using a fully integrated neuro-inspired memristor chip.” Science 381.6663 (2023): 1205-1211. STELLAR更新算法原理 在权值更新阶段&#xff0c;只需根据输入、输出和误差…

在python里面探索web框架

一、常识性知识 python Web框架三巨头&#xff1a;Flask&#xff08;简单易学&#xff09;、Django(复杂庞大)、FastAPI 1. Django&#xff1a;Django是一个高级的Web框架&#xff0c;它提供了强大的功能和工具&#xff0c;用于快速开发复杂的Web应用程序。 2. Flask&#xff…

【Java集合篇】ConcurrentHashMap是如何保证线程安全的

ConcurrentHashMap是如何保证线程安全的 ✔️典型解析✔️ 拓展知识仓✔️ 什么是CAS&#xff08;Compare And Swap&#xff09;✔️CAS和互斥量有什么区别✔️如何使用CAS和互斥量 ✔️CAS和Synchronized的区别✔️ConcurrentHashMap的优缺点✔️能用ConcurrentHashMap实现队列…

python对常见的激活函数绘图操作(详细代码讲解)

写论文的时候需要做一些激活函数的图像&#xff0c;为此将常见的激活函数进行整理汇总了一下&#xff0c;方便后续的复习 激活函数的作用是为让模型处理非线性问题&#xff0c;故次激活函数都是非线性的 生活中&#xff0c;非线性问题占大多数&#xff0c;而模型的训练通常都是…

哈希表-散列表数据结构

1、什么是哈希表&#xff1f; 哈希表也叫散列表&#xff0c;哈希表是根据关键码值(key value)来直接访问的一种数据结构&#xff0c;也就是将关键码值(key value)通过一种映射关系映射到表中的一个位置来加快查找的速度&#xff0c;这种映射关系称之为哈希函数或者散列函数&…

Rollup-plugin-bundle-analyzer VS Rollup-plugin-visualizer

分析和可视化Rollup打包后的文件的插件 Rollup-plugin-bundle-analyzerRollup-plugin-visualizer Rollup-plugin-bundle-analyzer和Rollup-plugin-visualizer都是用于分析和可视化Rollup打包后的文件的插件&#xff0c;但它们在功能和使用方式上存在一些差异。 Rollup-plugi…

PostGIS教程学习十九:基于索引的聚簇

PostGIS教程学习十九&#xff1a;基于索引的聚簇 数据库只能以从磁盘获取信息的速度检索信息。小型数据库将完全位于于RAM缓存&#xff08;内存&#xff09;&#xff0c;并摆脱物理磁盘访问速度慢的限制。但是对于大型数据库&#xff0c;对物理磁盘的访问将限制数据库的信息检…

FFmpeg获取音视频流信息

文章目录 前言一、需求二、源码三、运行结果 前言 本文记录用 FFmpeg 获取视频流音频流的信息&#xff08;编码格式、分辨率、帧率、播放时长…&#xff09;&#xff0c;所用的工程基于上个博客编译成功的工程&#xff1a;使用FFmpeg4.3.1的SDK官方开发包编译ffmpeg.c 一、需求…

透明OLED拼接屏:重塑大屏显示的新篇章

随着科技的快速发展&#xff0c;大屏显示技术已经逐渐渗透到我们生活的方方面面。作为显示技术领域的一大革新&#xff0c;透明OLED拼接屏以其独特的透明显示特性&#xff0c;正逐渐成为大屏显示市场的新宠。尼伽小编将深入探讨透明OLED拼接屏的技术特点、应用场景以及市场前景…

中国葡萄酒消费者的口味偏好

有一段时间&#xff0c;“中国口味”的问题是全世界葡萄酒销售者的热门话题&#xff0c;因为他们积极探索每一个线索&#xff0c;以发现让他们在市场上领先的秘密。为此进行了大量研究&#xff0c;多年前葡萄酒销售商或多或少形成了一个共识&#xff1a;尽管中国人的口味差异很…

系列十四、while do...while switch模板代码

一、while & do...while & switch模板代码 1.1、while /*** 需求&#xff1a;使用while循环打印5遍Hello World!*/ Test public void print5() {int i 1;while (i < 5) {System.out.println("Hello World! " LocalDateTime.now());// 线程休眠&#x…

Spring MVC学习之——了解MVC设计模式

MVC设计模式 MVC介绍 MVC 模式代表 Model-View-Controller&#xff08;模型-视图-控制器&#xff09; 模式。这种模式用于应用程序的分层开发。 Model&#xff08;模型&#xff09; - 模型代表一个存取数据的对象或 JAVA POJO。它也可以带有逻辑&#xff0c;在数据变化时更新…

Java项目:112SSM在线电影订票系统

博主主页&#xff1a;Java旅途 简介&#xff1a;分享计算机知识、学习路线、系统源码及教程 文末获取源码 一、项目介绍 在线电影订票系统基于SpringSpringMVCMybatis开发&#xff0c;系统分为前台和后台&#xff0c;前台主要用来用户浏览电影信息&#xff0c;订票&#xff0c…

人工智能任务2-读懂Transformer模型的十个灵魂拷问问题,深度理解Transformer模型架构

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下人工智能任务2-读懂Transformer模型的十个灵魂拷问问题&#xff0c;深度理解Transformer模型架构。Transformer模型是一种基于自注意力机制的神经网络架构&#xff0c;被广泛用于自然语言处理任务中&#xff0c;如机…

系分笔记数据库技术之数据库安全措施

文章目录 1、概要2、数据库的保护措施3、数据库的故障4、备份和日志5、总结 1、概要 数据库设计是考试重点&#xff0c;常考和必考内容&#xff0c;本篇主要记录了知识点&#xff1a;数据库故障及解决、数据库安全保护措施和数据库备份及恢复。 2、数据库的保护措施 数据库安全…

云卷云舒:【实战篇】云主机/虚拟机迁移

1. 简介 用户原有业务通过不同版本型号、不同操作系统的主机承载&#xff0c;形式上包括物理服务器、虚拟机、公有云主机等。随着业务不断扩张&#xff0c;需要将其业务云化转型&#xff0c;必须保证上云过程数据完整&#xff0c;业务平滑过度。 如果将所有业务系统都重新部署…

以太坊开发者会议回顾:坎昆升级、硬分叉与布拉格

作者&#xff1a;Christine Kim Galaxy研究副总裁 编译&#xff1a;秦晋 碳链价值 2024年1月4日&#xff0c;以太坊开发人员齐聚Zoom for All Core Developers Execution (ACDE) Call #178 上。ACDE电话会议通常由以太坊基金会协议负责人Tim Beiko主持&#xff0c;是一个开发人…