RocketMQ常用基本操作

文章中的rabbitmq使用的是rocketmq-all-5.1.3-bin-release版本,需要安装包的可自行下载

RockerMQ启动停止命令

启动命令

nohup sh bin/mqnamesrv &

nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

查看日志

tail -f ~/logs/rocketmqlogs/namesrv.log

tail -f ~/logs/rocketmqlogs/proxy.log

停止命令

sh bin/mqshutdown broker

sh bin/mqshutdown namesrv

集群状态

sh mqadmin clusterList -n 127.0.0.1:9876

创建topic

sh mqadmin updateTopic -n 127.0.0.1:9876 rocket_test

查看所有topic信息

sh mqadmin topicList -n 127.0.0.1:9876

sh mqadmin topicList -n 127.0.0.1:9876 -c

查看 Topic 路由信息

sh mqadmin topicRoute -n 127.0.0.1:9876 -t TopicTest

发送测试消息

export NAMESRV_ADDR=localhost:9876

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

消费消息

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

Java代码收发消息

Producer

package com.rocket.demo;

import com.alibaba.fastjson.JSON;

import org.apache.rocketmq.client.producer.DefaultMQProducer;

import org.apache.rocketmq.client.producer.SendResult;

import org.apache.rocketmq.common.message.Message;

import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.HashMap;

import java.util.Map;

public class RocketProducerDemo {

    private final static String nameServer = "127.0.0.1:9876";

    private final static String producerGroup = "my_group2";

    // debezium-mysql-source-topic topic-test

    private final static String topic = "TopicTest";

    public static void main(String[] args) {

        try {

            // 初始化一个producer并设置Producer group name

            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);

//            DefaultMQProducer producer = new DefaultMQProducer();

            // 设置NameServer地址

            producer.setNamesrvAddr(nameServer);

            // 启动producer

            producer.start();

            for (int i = 0; i < 100; i++) {

                Map<String, String> data = new HashMap();

                data.put("id", i+"");

                data.put("name", i+","+System.currentTimeMillis());

                // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤

                Message msg = new Message("TopicTest", "tagA", JSON.toJSONString(data).getBytes(RemotingHelper.DEFAULT_CHARSET));

                // 利用producer进行发送,并同步等待发送结果

                SendResult sendResult = producer.send(msg, 10000);

                System.out.println(sendResult);

            }

            // 一旦producer不再使用,关闭producer

            producer.shutdown();

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

}

Consumer

package com.rocket.demo;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class RocketConsumerDemo {

    public static void main(String[] args) throws Exception {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_group");

        consumer.setNamesrvAddr("localhost:9876");

        // debezium-mysql-source-topic  topic-test debezium-mysql-source db-history-debezium-topic debezium-mysql-source

        consumer.subscribe("TopicTest", "*"); // 订阅主题和标签,* 表示订阅所有标签

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {

                for (MessageExt message : messages) {

                    System.out.println("Received message: " + new String(message.getBody()));

                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

            }

        });

        consumer.start();

        System.out.println("Consumer started");

    }

}

常见问题

service not available now. It may be caused by one of the following reasons: the broker's disk is full [CL: 0.96 CQ: 0.96 INDEX: 0.96], messages are put to the slave, message store has been shut down

错误原因:博主测试的服务器磁盘使用率到0.96了,rocketmq不允许磁盘超过0.9,清理下磁盘数据即可

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/38563.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

多线程编程的挑战与解决方案

多线程编程的挑战与解决方案 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01; 1. 多线程编程的挑战 在现代软件开发中&#xff0c;多线程编程成为处理并发任务…

PatchTST创新点

这篇论文的创新点主要集中在PatchTST模型的设计和应用中。以下是对其创新点的详细说明&#xff1a; 创新点 频道独立补丁设计&#xff1a;PatchTST模型通过将多变量时间序列分割成不同的频道&#xff0c;每个频道作为单变量时间序列处理。每个频道独立地通过实例归一化操作和补…

明星中药企业系列洞察(九)一手好牌打的稀烂!近500年老字号锁定退市,太安堂为何“塌房”了?

近日&#xff0c;太安堂发布公告称&#xff0c;公司已收到深交所下发的《关于广东太安堂药业股份有限公司股票终止上市的决定》&#xff0c;深交所决定终止公司股票上市&#xff0c;预计其最后交易日期为7月4日。太安堂曾作为国内知名的中成药上市公司之一&#xff0c;是国家级…

matlab仿真 通信信号和系统分析(上)

&#xff08;内容源自详解MATLAB&#xff0f;SIMULINK 通信系统建模与仿真 刘学勇编著第三章内容&#xff0c;有兴趣的读者请阅读原书&#xff09; 一、求离散信号卷积和 主要还是使用卷积函数conv&#xff0c;值得注意的是&#xff0c;得到的卷积和长度结果为81&#xff0…

node.js+uniapp(vue),阿里云短信验证码

reg.vue: 思路是&#xff1a;前端调用获取验证码的接口 > 后端生成验证码返回给前端 > 前端渲染验证码 <template> <div> <input class"sl-input" v-model"phone" type"tel" maxlength"11" placeholder"手…

微信小程序毕业设计-微信食堂线上订餐系统项目开发实战(附源码+论文)

大家好&#xff01;我是程序猿老A&#xff0c;感谢您阅读本文&#xff0c;欢迎一键三连哦。 &#x1f49e;当前专栏&#xff1a;微信小程序毕业设计 精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; &#x1f380; Python毕业设计…

【在线评论】不同视角下在线评论对客户满意度和推荐度的影响—推文分析—2024-07-01

今天的推文主题是【在线评论】&#xff0c;重点关注可以关注第四篇&#xff0c;很全面地分析了在线评论的信息多维性。 第一篇从客户的在线评论入手&#xff0c;将客户消费的动机为功利、享受、社会满足&#xff1b;第二篇是关于在线评论对消费者再次选择同一家酒店的机制探索…

MySQL之主从同步、分库分表

1、主从同步的原理 MySQL主从复制的核心是二进制日志 二进制日志&#xff08;binlog&#xff09;记录了所有DDL语句和DML语句&#xff0c;但不包括数据查询&#xff08;select、show&#xff09;语句。 1.1、复制分三步 master主库在事务提交时&#xff0c;会把数据变更记录…

电子战学习笔记01:电子战概论

0、写在文前 本人在学习电子战相关理论知识时&#xff0c;一直感觉无从下手&#xff0c;之后在老师的推荐下购买了《EW101&#xff1a;电子战基础》纸质书籍学习&#xff0c;所以将自己的学习笔记在CSDN上记录一下&#xff0c;也供有需要的同学参考。 1、电子战定义 电子战&…

Spring Boot与Apache Kafka集成的深度指南

Spring Boot与Apache Kafka集成的深度指南 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01; 在现代分布式系统中&#xff0c;消息队列的作用愈发重要&#xff0…

【鸿蒙学习笔记】鸿蒙ArkTS学习笔记

应用开发导读&#xff1a;https://developer.huawei.com/consumer/cn/doc/harmonyos-guides-V5/application-dev-guide-V5 【鸿蒙培训】第&#xff11;天・环境安装 【鸿蒙培训】第&#xff12;天・装饰器・组件和页面生命周期 【鸿蒙学习笔记】数据类型 【鸿蒙学习笔记】运算…

Spring Cloud中的服务发现与注册

Spring Cloud中的服务发现与注册 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;今天我们将探讨Spring Cloud中的服务发现与注册&#xff0c;这是微服务架构中至…

全网最详细的 gin框架请求数据绑定Bind 源码解析 -- 帮助你全面了解gin框架的请求数据绑定原理和方法

在gin框架中&#xff0c;我们可以将多种请求数据&#xff08;json, form,uri&#xff0c;header等&#xff09;直接绑定到我们定义的结构体&#xff0c;底层是通过反射方式获取我们定义在结构体上面的tag来实现请求数据到我们的结构体数据的绑定的。 在gin的底层有2大体系的数据…

Python pip install模块时C++编译环境问题

pip install模块时C编译环境问题 在接触和使用python后&#xff0c;常常会通过pip install命令安装第三方模块&#xff0c;大多数模块可以直接安装&#xff0c;但许多新同学仍会遇见某些模块需要实时编译后才能安装&#xff0c;如报错信息大概是缺乏C编译环境&#xff0c;本文则…

【Elasticsearch】Elasticsearch索引创建与管理详解

文章目录 &#x1f4d1;引言一、Elasticsearch 索引的基础概念二、创建索引2.1 使用默认设置创建索引2.2 自定义设置创建索引2.3 创建索引并设置映射 三、索引模板3.1 创建索引模板3.2 使用索引模板创建索引 四、管理索引4.1 查看索引4.2 更新索引设置4.3 删除索引 五、索引别名…

Go-知识测试-性能测试

Go-知识测试-性能测试 1. 定义2. 例子3. testing.common 测试基础数据4. testing.TB 接口5. 关键函数5.1 testing.runBenchmarks5.2 testing.B.runN5.3 testing.B.StartTimer5.4 testing.B.StopTimer5.5 testing.B.ResetTimer5.6 testing.B.Run5.7 testing.B.run15.8 testing.B…

监听蓝牙对话的BlueSpy技术复现

本文是之前文章的BlueSpy技术的复现过程&#xff1a;https://mp.weixin.qq.com/s/iCeImLLPAwwKH1avLmqEpA 2个月前&#xff0c;网络安全和情报公司Tarlogic在西班牙安全大会RootedCon 2024上提出了一项利用蓝牙漏洞的BlueSpy技术&#xff0c;并在之后发布了一个名为BlueSpy的概…

深度学习之生成对抗网络StyleGAN3

StyleGAN3 是由 NVIDIA 团队提出的第三代生成对抗网络(GAN),在前代 StyleGAN 和 StyleGAN2 的基础上进行了改进,以实现更高质量的图像生成。StyleGAN3 的主要改进在于解决了 StyleGAN2 中存在的伪影(artifacts)问题,并且提升了生成图像的一致性和稳定性。 StyleGAN3 的…

git 提交代码忽略eslint代码检测

在暂存代码的时候会出现以上情况因为在提交代码的时候会默认运行代码进行检测&#xff0c;如果不符合代码规范就会进行报错 解决&#xff1a; 使用 git commit --no-verify -m xxx 忽略eslint的检测

Laravel 谨慎使用Storage::append()

在 driver 为 local 时&#xff0c;Storage::append()在高并发下&#xff0c;会存在丢失数据问题&#xff0c;文件被覆写&#xff0c;而非尾部添加&#xff0c;如果明确是本地文件操作&#xff0c;像日志写入&#xff0c;建议使用 Illuminate\Filesystem\Filesystem或者php原生…