RocketMQ5-03RocketMQ-Dashboard和Java客户端访问示例

接上篇02快速部署RocketMQ5.x(手动和容器部署)
已经完成 RocketMQ5.0 环境的部署,就需要对这个环境进行测试,查看集群、写入消息、读取消息等

本篇教你如何使用和查看部署的服务:

  • Docker部署 Dashboard
    • 获取镜像并下载
    • 部署服务
  • 客户端连接
    • pom文件
    • 生产者代码
    • 消费者代码
    • 接口测试
    • 问题: broker资源不足无法提供服务

Docker部署 Dashboard

以上通过可执行文件部署或者容器部署的形式,都需要有一个可以查看的集群的地方,对于官方自己配备的有 rocketmq-dashboard, 可以使用docker快速部署,便于测试

获取镜像并下载

docker search rocketmq-dashboard & docker pull apacherocketmq/rocketmq-dashboard

部署服务

docker run -d --name rmqdashboard -e "JAVA_OPTS=-Xmx256M -Xms256M -Xmn128M -Drocketmq.namesrv.addr=192.168.2.92:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8088:8080 apacherocketmq/rocketmq-dashboard

这边将端口映射到了8088,所以访问 localhost:8088,就可以查看到集群,如果有数据正在写入与读取,就能够大概看到数据量
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

客户端连接

手动创建 topic: sh bin/mqadmin updatetopic -n 192.168.2.92:9876 -t dataTopic2 -c DefaultCluster

pom文件

 <properties><java.version>17</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>3.0.2</spring-boot.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.5</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>17</source><target>17</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring-boot.version}</version><configuration><mainClass>com.learning.springbootrmq5.SpringbootRmq5Application</mainClass><skip>true</skip></configuration><executions><execution><id>repackage</id><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build>

生产者代码

 @GetMapping("/sendSync")public String sendSync() throws ClientException, IOException {String endpoint = "192.168.2.92:8081";String topic = "dataTopic2";ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);ClientConfiguration configuration = builder.enableSsl(true).build();Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build();Message message = provider.newMessageBuilder().setTopic(topic).setKeys("messageKey").setTag("messageTag").setBody("messageBodySync".getBytes()).build();try {SendReceipt sendReceipt = producer.send(message);log.info("Send sync message successfully, messageId={}", sendReceipt.getMessageId());} catch (ClientException e) {log.error("Failed to send message", e);}producer.close();return "success";}@GetMapping("/sendAsync")public String sendAsync() throws ClientException, InterruptedException, IOException {String endpoint = "192.168.2.92:8081";String topic = "dataTopic2";ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().enableSsl(true).setEndpoints(endpoint);ClientConfiguration configuration = builder.build();Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build();Message message = provider.newMessageBuilder().setTopic(topic).setKeys("messageKey").setTag("messageTag").setBody("messageBodyASync".getBytes()).build();producer.sendAsync(message);log.info("Send async message successfully, messageId");return "success";}

消费者代码

@Slf4j
@Component
public class MessageConsumerRunner implements CommandLineRunner {@Overridepublic void run(final String... args) throws Exception {final ClientServiceProvider provider = ClientServiceProvider.loadService();String endpoints = "192.168.2.92:8081";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();String tag = "*";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);String consumerGroup = "YourConsumerGroup";String topic = "dataTopic2";PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup(consumerGroup).setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)).setMessageListener(messageView -> {log.info("Consume message successfully, messageId={}", messageView.getMessageId());return ConsumeResult.SUCCESS;}).build();Thread.sleep(Long.MAX_VALUE);}
}

接口测试

请求接口 /msg/sendAsync
在这里插入图片描述

能够正常收发

问题: broker资源不足无法提供服务

可能出现的客户端报错为:

org.apache.rocketmq.client.java.exception.InternalErrorException: [request-id=e3f9dxxxx1aa872, response-code=50001] org.apache.rocketmq.proxy.common.ProxyException: service not available now. It may be caused by one of the following reasons: the broker's disk is full [CL:  0.96 CQ:  0.96 INDEX: -1.00], messages are put to the slave, message store has been shut down, etc.java.util.concurrent.RejectedExecutionException: Task org.apache.rocketmq.shaded.io.grpc.internal.DelayedStream$4@72ba34c2 rejected from java.util.concurrent.ThreadPoolExecutor@7deb0119[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 13]at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065) ~[na:na]at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833) ~[na:na]at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365) ~[na:na]

以上大体就是描述资源不足无法进行接入、服务不可达等,通常就是因为环境的资源不足,可能是内存、可能是硬盘

.../broker/logs/rocketmqlogs/store.log 中可以看出端倪,是磁盘存储不够了

2024-01-08 13:34:24 ERROR StoreScheduledThread1 - physic disk maybe full soon 0.95, so mark disk full, storePathPhysic=/home/rocketmq/store/commitlog

可以通过清除以下数据暂时缓解 .../broker/store/commitlog,可以发现没怎么用也有好多G。不过确实需要使用的话尽早考虑扩容啊

扩大存储增加可用磁盘空间,就能够正常使用连接了

如果这篇文章对你有用的话,帮忙留个关注吧~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/608788.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

哈希-力扣01两数之和

题目 给定一个整数数组 nums 和一个整数目标值 target&#xff0c;请你在该数组中找出 和为目标值 target 的那 两个 整数&#xff0c;并返回它们的数组下标。 你可以假设每种输入只会对应一个答案。但是&#xff0c;数组中同一个元素在答案里不能重复出现。 你可以按任意顺…

spring boot 2升级为spring boot 3中数据库连接池druid的问题

目录 ConfigurationClassPostProcessor ConfigurationClassBeanDefinitionReader MybatisPlusAutoConfiguration ConditionEvaluator OnBeanCondition 总结 近期给了一个任务&#xff0c;要求是对现有的 spring boot 2.x 项目进行升级&#xff0c;由于 spring boot 2.x 版…

35-javascript基础,引入方式;变量命名规范

html分为三部分&#xff1b;结构html&#xff0c;表现css&#xff0c;行为js&#xff1b;js就是javascript js包含三部分&#xff1a; ECMAScript&#xff1a;简称ES&#xff0c;ES5&#xff0c;ES6核心语法 DOM&#xff1a;获取和操作html元素的标准方法&#xff1b;BOM&am…

Linux Capabilities 进阶实战

目录 1. 快速回顾 2. 为可执行文件分配 capabilities 3. 构建半特权环境 4. 容器与 capabilities Linux Capabilities 基础概念与基本使用 上一篇学习了LinuxCapabilities的基础知识和基本使用&#xff0c;因为后面需要学习Docker的逃逸&#xff0c;理解Linux Capabilitie…

忆阻器芯片STELLAR权重更新算法(清华大学吴华强课题组)

参考文献&#xff08;清华大学吴华强课题组&#xff09; Zhang, Wenbin, et al. “Edge learning using a fully integrated neuro-inspired memristor chip.” Science 381.6663 (2023): 1205-1211. STELLAR更新算法原理 在权值更新阶段&#xff0c;只需根据输入、输出和误差…

在python里面探索web框架

一、常识性知识 python Web框架三巨头&#xff1a;Flask&#xff08;简单易学&#xff09;、Django(复杂庞大)、FastAPI 1. Django&#xff1a;Django是一个高级的Web框架&#xff0c;它提供了强大的功能和工具&#xff0c;用于快速开发复杂的Web应用程序。 2. Flask&#xff…

基于SpringBoot使用AOP开发接口的访问日志信息

SpringBoot的AOP原理 Spring Boot的AOP&#xff08;面向切面编程&#xff09;原理是基于动态代理实现的。 在Spring Boot中&#xff0c;AOP通过代理模式对目标对象进行包装&#xff0c;实现在目标对象的方法执行前后增加额外的逻辑。AOP可以在不修改目标对象的情况下&#xf…

BGP公认必遵属性——Origin(二)

BGP公认必遵属性共有三个&#xff0c;分别是&#xff1a;Next-hop、Origin、As-path&#xff0c;本期介绍Origin 点赞关注&#xff0c;持续更新&#xff01;&#xff01;&#xff01; Origin Origin属性用来定义路径信息的来源&#xff0c;只要不被修改&#xff0c;该属性就不…

【Java集合篇】ConcurrentHashMap是如何保证线程安全的

ConcurrentHashMap是如何保证线程安全的 ✔️典型解析✔️ 拓展知识仓✔️ 什么是CAS&#xff08;Compare And Swap&#xff09;✔️CAS和互斥量有什么区别✔️如何使用CAS和互斥量 ✔️CAS和Synchronized的区别✔️ConcurrentHashMap的优缺点✔️能用ConcurrentHashMap实现队列…

python对常见的激活函数绘图操作(详细代码讲解)

写论文的时候需要做一些激活函数的图像&#xff0c;为此将常见的激活函数进行整理汇总了一下&#xff0c;方便后续的复习 激活函数的作用是为让模型处理非线性问题&#xff0c;故次激活函数都是非线性的 生活中&#xff0c;非线性问题占大多数&#xff0c;而模型的训练通常都是…

metartc5_jz源码阅读-yang_send_avpacket

//pushh264中调用此方法将rtp包发送给p2p对端。 int32_t yang_send_avpacket(YangRtcSession *session, YangRtpPacket *pkt, YangBuffer *pbuf) {int32_t err Yang_Ok;//获取到pbuf的size作为要加密的sizeint32_t nn_encrypt yang_buffer_pos(pbuf);//将pbuf中的数据根据seq…

在React里面使用mobx状态管理详细步骤

1、安装MobX和MobX React&#xff1a; 在你的项目目录下运行以下命令安装MobX和MobX React&#xff1a; npm install mobx mobx-react2、创建MobX Store&#xff1a; 创建一个用于管理状态的MobX Store。这个Store应该包含你希望全局管理的状态和相关的操作。以下是一个简单…

flask flask-sqlalchemy sqlit3

这次是数据库使用&#xff0c;拒绝花哨主打就是一个简单 pip install flask-sqlalchemy 调用数据库现在配置里边设置下然后绑上APP后&#xff0c;定义数据结构类.下面是我认为最简单的数据库增删查改结构。 from flask_sqlalchemy import SQLAlchemy app.config[SQLALCHEMY_DAT…

哈希表-散列表数据结构

1、什么是哈希表&#xff1f; 哈希表也叫散列表&#xff0c;哈希表是根据关键码值(key value)来直接访问的一种数据结构&#xff0c;也就是将关键码值(key value)通过一种映射关系映射到表中的一个位置来加快查找的速度&#xff0c;这种映射关系称之为哈希函数或者散列函数&…

Rollup-plugin-bundle-analyzer VS Rollup-plugin-visualizer

分析和可视化Rollup打包后的文件的插件 Rollup-plugin-bundle-analyzerRollup-plugin-visualizer Rollup-plugin-bundle-analyzer和Rollup-plugin-visualizer都是用于分析和可视化Rollup打包后的文件的插件&#xff0c;但它们在功能和使用方式上存在一些差异。 Rollup-plugi…

PostGIS教程学习十九:基于索引的聚簇

PostGIS教程学习十九&#xff1a;基于索引的聚簇 数据库只能以从磁盘获取信息的速度检索信息。小型数据库将完全位于于RAM缓存&#xff08;内存&#xff09;&#xff0c;并摆脱物理磁盘访问速度慢的限制。但是对于大型数据库&#xff0c;对物理磁盘的访问将限制数据库的信息检…

FFmpeg获取音视频流信息

文章目录 前言一、需求二、源码三、运行结果 前言 本文记录用 FFmpeg 获取视频流音频流的信息&#xff08;编码格式、分辨率、帧率、播放时长…&#xff09;&#xff0c;所用的工程基于上个博客编译成功的工程&#xff1a;使用FFmpeg4.3.1的SDK官方开发包编译ffmpeg.c 一、需求…

sqlcmd执行sql文件

可以使用以下命令来在SQL Server中执行SQL脚本文件&#xff08;.sql&#xff09;&#xff1a; sqlcmd -S <服务器名称> -d <数据库名称> -i <脚本文件路径> 其中&#xff0c;<服务器名称>为要连接的 SQL Server 实例的名称或 IP 地址&#xff1b; &l…

掌握 Spring IoC 容器与 Bean 作用域:详解 singleton 与 prototype 的使用与配置

在您的应用程序中&#xff0c;由 Spring IoC 容器管理的形成其核心的对象被称为 "bean"。一个 bean 是由 Spring IoC 容器实例化、组装和管理的对象 这些 bean 是通过您提供给容器的配置元数据创建的。Bean 定义包含了所谓的配置元数据&#xff0c;容器需要了解以下内…

sqlcmd导出sql文件

使用SQLCMD命令行工具可以将数据库中的查询结果导出为SQL文件。 下面是示例代码&#xff1a; sqlcmd -S <服务器名称> -d <数据库名称> -U <用户名> -P <密码> -Q "<查询语句>" -o <输出路径\文件名.sql> 其中&#xff0c;需…