RocketMQ常用基本操作

文章中的rabbitmq使用的是rocketmq-all-5.1.3-bin-release版本,需要安装包的可自行下载

RockerMQ启动停止命令

启动命令

nohup sh bin/mqnamesrv &

nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

查看日志

tail -f ~/logs/rocketmqlogs/namesrv.log

tail -f ~/logs/rocketmqlogs/proxy.log

停止命令

sh bin/mqshutdown broker

sh bin/mqshutdown namesrv

集群状态

sh mqadmin clusterList -n 127.0.0.1:9876

创建topic

sh mqadmin updateTopic -n 127.0.0.1:9876 rocket_test

查看所有topic信息

sh mqadmin topicList -n 127.0.0.1:9876

sh mqadmin topicList -n 127.0.0.1:9876 -c

查看 Topic 路由信息

sh mqadmin topicRoute -n 127.0.0.1:9876 -t TopicTest

发送测试消息

export NAMESRV_ADDR=localhost:9876

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

消费消息

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

Java代码收发消息

Producer

package com.rocket.demo;

import com.alibaba.fastjson.JSON;

import org.apache.rocketmq.client.producer.DefaultMQProducer;

import org.apache.rocketmq.client.producer.SendResult;

import org.apache.rocketmq.common.message.Message;

import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.HashMap;

import java.util.Map;

public class RocketProducerDemo {

    private final static String nameServer = "127.0.0.1:9876";

    private final static String producerGroup = "my_group2";

    // debezium-mysql-source-topic topic-test

    private final static String topic = "TopicTest";

    public static void main(String[] args) {

        try {

            // 初始化一个producer并设置Producer group name

            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);

//            DefaultMQProducer producer = new DefaultMQProducer();

            // 设置NameServer地址

            producer.setNamesrvAddr(nameServer);

            // 启动producer

            producer.start();

            for (int i = 0; i < 100; i++) {

                Map<String, String> data = new HashMap();

                data.put("id", i+"");

                data.put("name", i+","+System.currentTimeMillis());

                // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤

                Message msg = new Message("TopicTest", "tagA", JSON.toJSONString(data).getBytes(RemotingHelper.DEFAULT_CHARSET));

                // 利用producer进行发送,并同步等待发送结果

                SendResult sendResult = producer.send(msg, 10000);

                System.out.println(sendResult);

            }

            // 一旦producer不再使用,关闭producer

            producer.shutdown();

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

}

Consumer

package com.rocket.demo;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class RocketConsumerDemo {

    public static void main(String[] args) throws Exception {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_group");

        consumer.setNamesrvAddr("localhost:9876");

        // debezium-mysql-source-topic  topic-test debezium-mysql-source db-history-debezium-topic debezium-mysql-source

        consumer.subscribe("TopicTest", "*"); // 订阅主题和标签,* 表示订阅所有标签

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {

                for (MessageExt message : messages) {

                    System.out.println("Received message: " + new String(message.getBody()));

                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

            }

        });

        consumer.start();

        System.out.println("Consumer started");

    }

}

常见问题

service not available now. It may be caused by one of the following reasons: the broker's disk is full [CL: 0.96 CQ: 0.96 INDEX: 0.96], messages are put to the slave, message store has been shut down

错误原因:博主测试的服务器磁盘使用率到0.96了,rocketmq不允许磁盘超过0.9,清理下磁盘数据即可

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/38563.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

明星中药企业系列洞察(九)一手好牌打的稀烂!近500年老字号锁定退市,太安堂为何“塌房”了?

近日&#xff0c;太安堂发布公告称&#xff0c;公司已收到深交所下发的《关于广东太安堂药业股份有限公司股票终止上市的决定》&#xff0c;深交所决定终止公司股票上市&#xff0c;预计其最后交易日期为7月4日。太安堂曾作为国内知名的中成药上市公司之一&#xff0c;是国家级…

matlab仿真 通信信号和系统分析(上)

&#xff08;内容源自详解MATLAB&#xff0f;SIMULINK 通信系统建模与仿真 刘学勇编著第三章内容&#xff0c;有兴趣的读者请阅读原书&#xff09; 一、求离散信号卷积和 主要还是使用卷积函数conv&#xff0c;值得注意的是&#xff0c;得到的卷积和长度结果为81&#xff0…

node.js+uniapp(vue),阿里云短信验证码

reg.vue: 思路是&#xff1a;前端调用获取验证码的接口 > 后端生成验证码返回给前端 > 前端渲染验证码 <template> <div> <input class"sl-input" v-model"phone" type"tel" maxlength"11" placeholder"手…

微信小程序毕业设计-微信食堂线上订餐系统项目开发实战(附源码+论文)

大家好&#xff01;我是程序猿老A&#xff0c;感谢您阅读本文&#xff0c;欢迎一键三连哦。 &#x1f49e;当前专栏&#xff1a;微信小程序毕业设计 精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; &#x1f380; Python毕业设计…

【在线评论】不同视角下在线评论对客户满意度和推荐度的影响—推文分析—2024-07-01

今天的推文主题是【在线评论】&#xff0c;重点关注可以关注第四篇&#xff0c;很全面地分析了在线评论的信息多维性。 第一篇从客户的在线评论入手&#xff0c;将客户消费的动机为功利、享受、社会满足&#xff1b;第二篇是关于在线评论对消费者再次选择同一家酒店的机制探索…

MySQL之主从同步、分库分表

1、主从同步的原理 MySQL主从复制的核心是二进制日志 二进制日志&#xff08;binlog&#xff09;记录了所有DDL语句和DML语句&#xff0c;但不包括数据查询&#xff08;select、show&#xff09;语句。 1.1、复制分三步 master主库在事务提交时&#xff0c;会把数据变更记录…

电子战学习笔记01:电子战概论

0、写在文前 本人在学习电子战相关理论知识时&#xff0c;一直感觉无从下手&#xff0c;之后在老师的推荐下购买了《EW101&#xff1a;电子战基础》纸质书籍学习&#xff0c;所以将自己的学习笔记在CSDN上记录一下&#xff0c;也供有需要的同学参考。 1、电子战定义 电子战&…

全网最详细的 gin框架请求数据绑定Bind 源码解析 -- 帮助你全面了解gin框架的请求数据绑定原理和方法

在gin框架中&#xff0c;我们可以将多种请求数据&#xff08;json, form,uri&#xff0c;header等&#xff09;直接绑定到我们定义的结构体&#xff0c;底层是通过反射方式获取我们定义在结构体上面的tag来实现请求数据到我们的结构体数据的绑定的。 在gin的底层有2大体系的数据…

Python pip install模块时C++编译环境问题

pip install模块时C编译环境问题 在接触和使用python后&#xff0c;常常会通过pip install命令安装第三方模块&#xff0c;大多数模块可以直接安装&#xff0c;但许多新同学仍会遇见某些模块需要实时编译后才能安装&#xff0c;如报错信息大概是缺乏C编译环境&#xff0c;本文则…

【Elasticsearch】Elasticsearch索引创建与管理详解

文章目录 &#x1f4d1;引言一、Elasticsearch 索引的基础概念二、创建索引2.1 使用默认设置创建索引2.2 自定义设置创建索引2.3 创建索引并设置映射 三、索引模板3.1 创建索引模板3.2 使用索引模板创建索引 四、管理索引4.1 查看索引4.2 更新索引设置4.3 删除索引 五、索引别名…

Go-知识测试-性能测试

Go-知识测试-性能测试 1. 定义2. 例子3. testing.common 测试基础数据4. testing.TB 接口5. 关键函数5.1 testing.runBenchmarks5.2 testing.B.runN5.3 testing.B.StartTimer5.4 testing.B.StopTimer5.5 testing.B.ResetTimer5.6 testing.B.Run5.7 testing.B.run15.8 testing.B…

监听蓝牙对话的BlueSpy技术复现

本文是之前文章的BlueSpy技术的复现过程&#xff1a;https://mp.weixin.qq.com/s/iCeImLLPAwwKH1avLmqEpA 2个月前&#xff0c;网络安全和情报公司Tarlogic在西班牙安全大会RootedCon 2024上提出了一项利用蓝牙漏洞的BlueSpy技术&#xff0c;并在之后发布了一个名为BlueSpy的概…

git 提交代码忽略eslint代码检测

在暂存代码的时候会出现以上情况因为在提交代码的时候会默认运行代码进行检测&#xff0c;如果不符合代码规范就会进行报错 解决&#xff1a; 使用 git commit --no-verify -m xxx 忽略eslint的检测

Laravel 谨慎使用Storage::append()

在 driver 为 local 时&#xff0c;Storage::append()在高并发下&#xff0c;会存在丢失数据问题&#xff0c;文件被覆写&#xff0c;而非尾部添加&#xff0c;如果明确是本地文件操作&#xff0c;像日志写入&#xff0c;建议使用 Illuminate\Filesystem\Filesystem或者php原生…

邀请函 | 极限科技全新搜索引擎 INFINI Pizza 亮相 2024 可信数据库发展大会!

过去一年&#xff0c;在全球 AI 浪潮和国家数据局成立的推动下&#xff0c;数据库产业变革不断、热闹非凡。2024 年&#xff0c;站在中国数字经济产业升级和数据要素市场化建设的时代交汇点上&#xff0c;“2024 可信数据库发展大会” 将于 2024 年 7 月 16-17 日在北京悠唐皇冠…

肆拾玖坊的商业模式,49坊新零售奖金制度体系,众筹众创+会员制

肆拾玖坊之所以能够在短时间内成为白酒行业的“现象级”企业,,不仅是依靠独特商业模式,同时也依靠的是坚持用户为核心,围绕用户需求,让用户与产品直接产生连接理念。 坐标&#xff1a;厦门&#xff0c;我是易创客肖琳 深耕社交新零售行业10年&#xff0c;主要提供新零售系统工…

前端技术(二)——javasctipt 介绍

一、javascript基础 1. javascript简介 ⑴ javascript的起源 ⑵ javascript 简史 ⑶ javascript发展的时间线 ⑷ javascript的实现 ⑸ js第一个代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>…

Vue中的axios深度探索:从基础安装到高级功能应用的全面指南

文章目录 前言一、axios 请求1. axios的概念2. axios的安装3. axiso请求方式介绍4. axios请求本地数据5. axios跨域6. axios全局注册7. axios支持的请求类型1&#xff09;get请求2&#xff09;post请求3&#xff09;put请求4&#xff09;patch请求5&#xff09;delete请求 二、…

MyBatis操作数据库(入门)

本节目标 使用MyBatis完成简单的增删改查操作&#xff0c;参数传递掌握MyBatis的两种写法&#xff1a;注解和XML方式掌握MyBatis相关的日志配置 前言 在应用分层学习中&#xff0c;我们了解web应用程序一般分为三层&#xff0c;即Controller、Service、Dao。在之前的案例中&a…

化学SCI期刊,中科院4区,易录用,几乎不退稿

一、期刊名称 Chemical Papers 二、期刊简介概况 期刊类型&#xff1a;SCI 学科领域&#xff1a;化学 影响因子&#xff1a;2.1 中科院分区&#xff1a;4区 三、期刊征稿范围 该杂志致力于基础和应用化学和化学工程研究。它的范围很广&#xff0c;涵盖了所有化学科学&…