docker-compose安装rocketmq

1. 创建RocketMq文件目录

mkdir rocketmq
mkdir -p rocketmq/brokerconf  rocketmq/logs rocketmq/store

2.创建broker.conf配置文件

vim  brokerconf/broker.conf
# 集群名称
brokerClusterName = DefaultCluster
# broker 名字
brokerName = broker-a
# 0表示master,>0 表示slave
brokerId = 0
# 删除文件的时间点
deleteWhen = 04
# 文件保留时间
fileReservedTime = 48
# Broker 的角色
# # - ASYNC_MASTER 异步复制Master
# # - SYNC_MASTER 同步双写Master
# # - SLAVE
brokerRole = ASYNC_MASTER
# 刷盘方式
# # - ASYNC_FLUSH 异步刷盘
# # - SYNC_FLUSH 同步刷盘
flushDiskType = ASYNC_FLUSH# nameserver地址
namesrvAddr=60.204.149.224:9876
brokerIP1=60.204.149.224

3.创建docker-compose.yml文件

version: '3.8'
services:namesrv:image: apache/rocketmq:5.3.1container_name: rmqnamesrvports:- 9876:9876volumes:- /root/docker/rocketmq/logs:/opt/logs- /root/docker/rocketmq//store:/opt/storenetworks:- rocketmqcommand: sh mqnamesrvbroker:image: apache/rocketmq:5.3.1container_name: rmqbrokerports:- 10909:10909- 10911:10911- 10912:10912volumes:- /root/docker/rocketmq/logs:/opt/logs- /root/docker/rocketmq/store:/opt/store- /root/docker/rocketmq/brokerconf/broker.conf:/etc/rocketmq/broker.confenvironment:- NAMESRV_ADDR=rmqnamesrv:9876- JAVA_OPTS=" -Duser.home=/opt"depends_on:- namesrvnetworks:- rocketmqcommand: sh mqbroker -c /etc/rocketmq/broker.confproxy:image: apache/rocketmq:5.3.1container_name: rmqproxynetworks:- rocketmqdepends_on:- broker- namesrvports:- 8080:8080- 8081:8081restart: on-failureenvironment:- NAMESRV_ADDR=rmqnamesrv:9876command: sh mqproxy
networks:rocketmq:driver: bridge

4.启动rocketmq

 docker-compose -f docker-compose.yml up -d

5.关闭rocketmq

docker-compose down

6.测试

6.1 消息生产者
import static com.doudou.mq.MqConfig.NAME_SRV_ADDR;
import static com.doudou.mq.MqConfig.topic;import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {// 初始化消息生产者DefaultMQProducer producer = new DefaultMQProducer("DemoProducer");// 设置超时时间producer.setSendMsgTimeout(10000);// 指定nameserver地址producer.setNamesrvAddr(NAME_SRV_ADDR);// 启动消息生产者服务producer.start();for (int i = 0; i < 10; i++) {try {Message message = new Message(topic, ("Hello World" + i).getBytes(StandardCharsets.UTF_8));SendResult result = producer.send(message);System.out.printf("%s%n", result);Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();}
}
6.2 消息消费者
import static com.doudou.mq.MqConfig.NAME_SRV_ADDR;
import static com.doudou.mq.MqConfig.topic;import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DemoConsumer");consumer.setNamesrvAddr(NAME_SRV_ADDR);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe(topic, "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt messageExt : list) {System.out.println(new String(messageExt.getBody(), StandardCharsets.UTF_8));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started");}}
6.3 依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.3.0</version></dependency>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/888588.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【二分查找】力扣 875. 爱吃香蕉的珂珂

一、题目 二、思路 速度 k&#xff08;单位&#xff1a;根/小时&#xff09;是存在一个取值范围的。 速度越大肯定在规定的时间之内一定会吃完全部的香蕉&#xff0c;但也是可以确定出一个上界的。由于只要保证一小时之内&#xff0c;可以吃完香蕉数目最多的那一堆的香蕉&…

计算机网络-GRE基础实验二

前面我们学习了GRE隧道的建立以及通过静态路由指向的方式使得双方能够网络互联&#xff0c;但是通过静态路由可能比较麻烦&#xff0c;GRE支持组播、单播、广播因此可以在GRE隧道中运行动态路由协议使得网络配置更加灵活。 通过前面的动态路由协议的学习我们知道动态路由协议都…

QNX的内存布局和启动入口

参考资料: QNX官网文档 内存布局 添加图片注释,不超过 140 字(可选) 查看系统内存布局 # pidin syspage=asinfo Header size=0x00000108, Total Size=0x0000d1b0, #Cpu=8, Type=257 Section:asinfo offset:0x0000bdf0 size:0x00000d00 elsize:0x000000200000

重生之我在异世界学智力题(1)

大家好&#xff0c;这里是小编的博客频道 小编的博客&#xff1a;就爱学编程 很高兴在CSDN这个大家庭与大家相识&#xff0c;希望能在这里与大家共同进步&#xff0c;共同收获更好的自己&#xff01;&#xff01;&#xff01; 本文目录 引言智力题题目&#xff1a;《奇怪的时钟…

【优选算法】模拟

目录 一、[替换所有的问号](https://leetcode.cn/problems/replace-all-s-to-avoid-consecutive-repeating-characters/description/)二、[提莫攻击](https://leetcode.cn/problems/teemo-attacking/description/)三、[Z 字形变换](https://leetcode.cn/problems/zigzag-conver…

Pandas | 检查布尔序列函数any() 和 all()的区别

在 Python 中使用 pandas 库时&#xff0c;.any() 和 .all() 是两个用于检查布尔序列&#xff08;如 pandas 的 Series&#xff09;的函数&#xff0c;它们的行为和用途有所不同&#xff1a; 通常用于检查两列元素是否一致或者个别一致的情况 .any(): .any() 函数用于检查序列…

关于HTTP DEBUGGER PRO的DURATION列一点理解

最近在排查一个网络问题的时候&#xff0c;使用了HTTP DEBUGGER PRO进行抓包。发现HTTP DEBUGGER PRO抓包显示的DURATION列的耗时不太对劲&#xff0c;于是排查完网络问题就去看了下这个DURATION列实际所指的耗时&#xff0c;就有了这篇文章。 使用POSTMAN对https://www.rust-l…

Mysql数据库基础篇笔记

目录 sql语句 DDL——数据库定义语言&#xff08;定义库&#xff0c;表&#xff0c;字段&#xff09; 数据库操作&#xff1a; 表操作&#xff1a; DML 增删改语句 DQL 语法编写顺序&#xff1a; 条件查询 DCL 用户管理&#xff1a; 权限管理&#xff1a; 函数 常见字符串内置函…

联想按下“AI加速键”!目标:与5000万中小企业共创

根据相关数据显示&#xff0c;截至2023年末中国中小企业数量超过5300万家&#xff0c;中小企业支撑了中国经济的发展与前进。在AI大模型风潮到来之际&#xff0c;相比于AI带给大企业的长期价值&#xff0c;AI对中小企业有着更加直接、显著、决定性的意义。同时&#xff0c;AI与…

SpringBoot项目启动报错-Slf4j日志相关类找不到

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

SABO-CNN-BiGRU-Attention减法优化器优化卷积神经网络结合双向门控循环单元时间序列预测,含优化前后对比

SABO-CNN-BiGRU-Attention减法优化器优化卷积神经网络结合双向门控循环单元时间序列预测&#xff0c;含优化前后对比 目录 SABO-CNN-BiGRU-Attention减法优化器优化卷积神经网络结合双向门控循环单元时间序列预测&#xff0c;含优化前后对比预测效果基本介绍模型描述程序设计参…

【Java基础】集合

目录 CollectionListSet *重点&#xff1a; 底层机制&#xff08;源码&#xff09;应用场景 好处&#xff1a; 数组&#xff08;长度不可改&#xff0c;同一类型&#xff0c;增删不便&#xff09;集合&#xff08;动态保存&#xff0c;多种类型&#xff0c;方便操作&#xf…

实验3-实时数据流处理-Flink

1.前期准备 &#xff08;1&#xff09;Flink基础环境安装 参考文章&#xff1a; 利用docker-compose来搭建flink集群-CSDN博客 显示为这样就成功了 &#xff08;2&#xff09;把docker&#xff0c;docker-compose&#xff0c;kafka集群安装配置好 参考文章&#xff1a; …

javaweb-Mybaits

1.Mybaits入门 &#xff08;1&#xff09;介绍 &#xff08;2&#xff09; 2.Mybaits VS JDBC 3.数据库连接池 &#xff08;1&#xff09;SpringBoot默认连接池为hikari&#xff0c;切换为Druid有两种方式 方式一&#xff1a;加依赖 方式二&#xff1a;直接修改配置文件 …

Mybatis 关联查询

在 MyBatis 中&#xff0c;关联查询&#xff08;也称为复杂映射&#xff09;是指将多个表的数据通过 SQL 查询和结果映射的方式&#xff0c;组合成一个或多个 Java 对象。这种查询方式用于处理实体之间的关系&#xff0c;如一对一、一对多和多对多关系。通过关联查询&#xff0…

GPS模块/SATES-ST91Z8LR:电路搭建;直接用电脑的USB转串口进行通讯;模组上报定位数据转换地图识别的坐标手动查询地图位置

从事嵌入式单片机的工作算是符合我个人兴趣爱好的,当面对一个新的芯片我即想把芯片尽快搞懂完成项目赚钱,也想着能够把自己遇到的坑和注意事项记录下来,即方便自己后面查阅也可以分享给大家,这是一种冲动,但是这个或许并不是原厂希望的,尽管这样有可能会牺牲一些时间也有哪天原…

分布式光伏电站如何实现监控及集中运维管理?

安科瑞戴婷 Acrel-Fanny 前言 今年以来&#xff0c;在政策利好推动下光伏、风力发电、电化学储能及抽水蓄能等新能源行业发展迅速&#xff0c;装机容量均大幅度增长&#xff0c;新能源发电已经成为新型电力系统重要的组成部分&#xff0c;同时这也导致新型电力系统比传统的电…

大模型分类1—按应用类型

版权声明 本文原创作者:谷哥的小弟作者博客地址:http://blog.csdn.net/lfdfhl根据应用领域,大模型可分为自然语言处理、计算机视觉和多模态大模型。 1. 自然语言处理大模型(NLP) 1.1 应用领域与技术架构 自然语言处理大模型(NLP)的应用领域广泛,包括但不限于文本分类、…

2024 32kstar 的目前最佳开源RAG框架之一的 Langchain-Chatchat开源项目实践(一)

2024 32kstar 的目前最佳开源RAG框架之一的 Langchain-Chatchat开源项目实践&#xff08;一&#xff09; 文章目录 2024 32kstar 的目前最佳开源RAG框架之一的 Langchain-Chatchat开源项目实践&#xff08;一&#xff09;一、前言二、实践步骤1、软硬件要求&#xff08;1&#…

网络安全应急响应流程图

一、网络安全应急响应建设的背景和现状 当前&#xff0c;许多地区和单位已经初步建立了网络安全预警机制&#xff0c;实现了对一般网络安全事件的预警和处置。但是&#xff0c;由于网络与信息安全技术起步相对较晚&#xff0c;发展时间较短&#xff0c;与其他行业领域相比&…