Go微服务: 基于rocketmq:server和rocketmq:broker搭建RocketMQ环境,以及生产消息和延迟消费消息的实现

RocketMQ 的搭建


1 ) 配置 docker-compose.yaml 文件

version: '3.5'
services:rmqnamesrv:image: foxiswho/rocketmq:servercontainer_name: rmqnamesrvports:- 9876:9876volumes:- ./logs:/opt/logs- ./store:/opt/storenetworks:rmq:aliases:- rmqnamesrvrmqbroker:image: foxiswho/rocketmq:brokercontainer_name: rmqbrokerports:- "10909:10909"- "10911:10911"volumes:- ./logs:/opt/logs- ./store:/opt/store- ./conf/broker.conf:/etc/rocketmq/broker.confenvironment:NAMESRV_ADDR: "rmqnamesrv:9876"JAVA_OPTS: " -Duser.home=/opt"JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"command: mqbroker -c /etc/rocketmq/broker.confdepends_on:- rmqnamesrvnetworks:rmq:aliases:- rmqbrokerrmqconsole:image: styletang/rocketmq-console-ngcontainer_name: rmqconsoleports:- "8080:8080"environment:JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"depends_on:- rmqnamesrvnetworks:rmq:aliases:- rmqconsolenetworks:rmq:name: rmqdriver: bridge

2 ) 配置文件 conf/broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
brokerIP1=192.168.124.6
defaultTopicQueueNums = 4
autoCreateTopicEnable = true
autoCreateSubscriptionGroup = true
listenPort = 10911
deleteWhen = 04
fileReservedTime = 120
mapedfileSizeCommitLog = 1073741824
mapedfileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio = 88
maxMessageSize=65536
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
  • 注意,需要指定 brokerIP1 且不能使用 0.0.0.0 也不能不指定,否则无法通信
  • fileReservedTime 默认是 48h

3 ) 拉取镜像

  • $ docker pull foxiswho/rocketmq:server
  • $ docker pull foxiswho/rocketmq:broker
  • $ docker pull styletang/rocketmq-console-ng

4 )启动和检查

  • 启动 $ docker compose up -d
  • 检查状态 $ docker compose ps

打开 UI 界面验证

  • 访问:http://127.0.0.1:8080

上面这个就是和上面的 brokerIP1 对应

编写程序验证生产和消费消息

  • 现在简述下场景
    • 生产5条消息
    • 10s 后进行消费

代码实现

package mainimport ("context""fmt""log""os""strconv""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/consumer""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
)const groupName = "BBS_SHOP_GROUP_123"func GetMqAddr() string {mqAddr := "127.0.0.1:9876" // 这里填入你的 NameServer 端口return mqAddr
}func ProduceMsg(mqAddr string, topic string) {p, err := rocketmq.NewProducer( // 普通消息生产者producer.WithGroupName(groupName),producer.WithNsResolver(primitive.NewPassthroughResolver([]string{mqAddr})),producer.WithRetry(2),)if err != nil {panic(err)}err = p.Start()if err != nil {log.Fatal()fmt.Println("生产者错误: %v", err.Error())os.Exit(1)}for i := 0; i < 5; i++ {msg := &primitive.Message{Topic: topic,Body:  []byte("Hello XProjectOrder " + strconv.Itoa(i)),}msg.WithDelayTimeLevel(3)r, err := p.SendSync(context.Background(), msg)if err != nil {fmt.Println("发送消息错误: %v", err.Error())} else {fmt.Println("生产消息成功: " + r.String() + "-" + r.MsgID)}}err = p.Shutdown()if err != nil {fmt.Println("生产者shutdown: %v", err.Error())os.Exit(1)}
}func ComsumeMsg(mqAddr string, topic string) {c, err := rocketmq.NewPushConsumer(consumer.WithGroupName(groupName),consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{mqAddr})),)if err != nil {panic(err)}err = c.Subscribe(topic, consumer.MessageSelector{},func(ctx context.Context, msgList ...*primitive.MessageExt) (consumer.ConsumeResult, error) {for i := range msgList {fmt.Printf("订阅消息,消费%v \n", msgList[i])}return consumer.ConsumeSuccess, nil})if err != nil {fmt.Println("消费消息错误: %v", err.Error())}err = c.Start()if err != nil {fmt.Println("开启消费这错误: %v", err.Error())}time.Sleep(time.Hour)err = c.Shutdown()if err != nil {fmt.Println("shutdown消费者错误: %v", err.Error())}
}func main() {topic := "BBS_SHOP_TOPIC_123"mqAddr := GetMqAddr()ProduceMsg(mqAddr, topic)ComsumeMsg(mqAddr, topic)
}
  • 以上是一个 demo,在真实场景,自行进行封装处理

  • 这里定义了一个 主题 BBS_SHOP_TOPIC_123 和 一个订阅组 BBS_SHOP_GROUP_123

  • 查看生产消息输出

    生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80001, offsetMsgId=C0A87C0600002A9F0000000000000000, queueOffset=0, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=1]]-C0A87C06209F0000000017eef7e80001
    生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80002, offsetMsgId=C0A87C0600002A9F00000000000000DE, queueOffset=1, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=2]]-C0A87C06209F0000000017eef7e80002
    生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80003, offsetMsgId=C0A87C0600002A9F00000000000001BC, queueOffset=2, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=3]]-C0A87C06209F0000000017eef7e80003
    生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80004, offsetMsgId=C0A87C0600002A9F000000000000029A, queueOffset=3, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=0]]-C0A87C06209F0000000017eef7e80004
    生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80005, offsetMsgId=C0A87C0600002A9F0000000000000378, queueOffset=4, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=1]]-C0A87C06209F0000000017eef7e80005
    
  • 查看消费 (10s 之后)

    订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 0, Flag=0, properties=map[CONSUME_START_TIME:1717572747702 DELAY:3 MAX_OFFSET:1 MIN_OFFSET:0 REAL_QID:1 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80001], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80001, OffsetMsgId=C0A87C0600002A9F0000000000000456,QueueId=1, StoreSize=221, QueueOffset=0, SysFlag=0, BornTimestamp=1717572737685, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747694, StoreHost=192.168.124.6:10911, CommitLogOffset=1110, BodyCRC=407480418, ReconsumeTimes=0, PreparedTransactionOffset=0] 
    订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 1, Flag=0, properties=map[CONSUME_START_TIME:1717572747706 DELAY:3 MAX_OFFSET:1 MIN_OFFSET:0 REAL_QID:2 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80002], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80002, OffsetMsgId=C0A87C0600002A9F0000000000000533,QueueId=2, StoreSize=221, QueueOffset=0, SysFlag=0, BornTimestamp=1717572737698, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747701, StoreHost=192.168.124.6:10911, CommitLogOffset=1331, BodyCRC=1867421940, ReconsumeTimes=0, PreparedTransactionOffset=0] 
    订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 2, Flag=0, properties=map[CONSUME_START_TIME:1717572747710 DELAY:3 MAX_OFFSET:1 MIN_OFFSET:0 REAL_QID:3 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80003], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80003, OffsetMsgId=C0A87C0600002A9F0000000000000610,QueueId=3, StoreSize=221, QueueOffset=0, SysFlag=0, BornTimestamp=1717572737702, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747704, StoreHost=192.168.124.6:10911, CommitLogOffset=1552, BodyCRC=1984416078, ReconsumeTimes=0, PreparedTransactionOffset=0] 
    INFO[0010] update offset to broker success               MessageQueue="MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=0]" consumerGroup=BBS_SHOP_GROUP_123 offset=0
    订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 3, Flag=0, properties=map[CONSUME_START_TIME:1717572747713 DELAY:3 MAX_OFFSET:1 MIN_OFFSET:0 REAL_QID:0 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80004], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80004, OffsetMsgId=C0A87C0600002A9F00000000000006ED,QueueId=0, StoreSize=221, QueueOffset=0, SysFlag=0, BornTimestamp=1717572737706, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747709, StoreHost=192.168.124.6:10911, CommitLogOffset=1773, BodyCRC=21035480, ReconsumeTimes=0, PreparedTransactionOffset=0] 
    INFO[0010] update offset to broker success               MessageQueue="MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=1]" consumerGroup=BBS_SHOP_GROUP_123 offset=1
    INFO[0010] update offset to broker success               MessageQueue="MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=2]" consumerGroup=BBS_SHOP_GROUP_123 offset=1
    INFO[0010] update offset to broker success               MessageQueue="MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=3]" consumerGroup=BBS_SHOP_GROUP_123 offset=1
    订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 4, Flag=0, properties=map[CONSUME_START_TIME:1717572747717 DELAY:3 MAX_OFFSET:2 MIN_OFFSET:0 REAL_QID:1 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80005], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80005, OffsetMsgId=C0A87C0600002A9F00000000000007CA,QueueId=1, StoreSize=221, QueueOffset=1, SysFlag=0, BornTimestamp=1717572737709, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747711, StoreHost=192.168.124.6:10911, CommitLogOffset=1994, BodyCRC=522480763, ReconsumeTimes=0, PreparedTransactionOffset=0]
    
  • 以上就是生产和消费的主要过程

效果

总结

  • 以上是简单的环境搭建和生产消息,以及延迟消费消息的 demo 示例
  • 实际场景中,结合以上demo,对一些异步发送消息的场景进行灵活运用和升级

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/25059.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[HGAME 2023 week4]shellcode

看题目&#xff0c;将base64解密&#xff0c;然后dump下来&#xff0c;再拉进ida里&#xff0c;发现为tea加密 在tea加密中得到key 密文就是另外的一个文件 exp import re from ctypes import *import libnumdef decrypt(v, k):v0, v1 c_uint32(v[0]), c_uint32(v[1])delta…

【设计模式】行为型设计模式之 策略模式学习实践

介绍 策略模式&#xff08;Strategy&#xff09;&#xff0c;就是⼀个问题有多种解决⽅案&#xff0c;选择其中的⼀种使⽤&#xff0c;这种情况下我们 使⽤策略模式来实现灵活地选择&#xff0c;也能够⽅便地增加新的解决⽅案。⽐如做数学题&#xff0c;⼀个问题的 解法可能有…

如何拼接全景图?PTGui Pro macOS安装包

PTGui Pro是一款功能强大的全景图像拼接软件&#xff0c;特别适合专业摄影师和设计师使用。它能够将多张照片拼接成高质量的全景图&#xff0c;支持普通、圆柱和球形等多种全景模式。软件提供了自动图像拼接和手动模式&#xff0c;用户可根据需求灵活选择。同时&#xff0c;PTG…

在家AIAA(美国航空航天学会)文献如何查找下载

今天有位同学的求助文献来自AIAA&#xff08;美国航空航天学会&#xff09;&#xff0c;下面就讲一下不用求助他人自己就可搞定文献下载的途径并实例操作演示。 首先我们先对AIAA&#xff08;美国航空航天学会&#xff09;数据库做个简单的了解&#xff1a; 美国航空航天学会…

使用汇编和proteus实现仿真数码管显示电路

proteus介绍&#xff1a; proteus是一个十分便捷的用于电路仿真的软件&#xff0c;可以用于实现电路的设计、仿真、调试等。并且可以在对应的代码编辑区域&#xff0c;使用代码实现电路功能的仿真。 汇编语言介绍&#xff1a; 百度百科介绍如下&#xff1a; 汇编语言是培养…

Windows UAC权限详解以及因为权限不对等引发的若干问题排查

目录 1、什么是UAC&#xff1f; 2、微软为什么要设计UAC&#xff1f; 3、标准用户权限与管理员权限 4、程序到底以哪种权限运行&#xff1f;与哪些因素有关&#xff1f; 4.1、给程序设置以管理员权限运行的属性 4.2、当前登录用户的类型 5、案例1 - 无法在企业微信聊天框…

API测试工具

apifox 微信扫描登录 不推荐&#xff1a; Download Postman

CorelDraw安装时界面显示不全的解决方案

问题原因&#xff1a;安装包权限 解决方案&#xff1a; 1、安装包解压后&#xff0c;找到Setup文件&#xff0c;复制粘贴到当前文件夹并重命名为Getup后&#xff0c;右击Getup文件&#xff0c;选择“以管理员身份运行” 说明&#xff1a;除了命名Gsetup。还可以命名为其他的…

Vue第三方库与插件实战手册

title: Vue第三方库与插件实战手册 date: 2024/6/8 updated: 2024/6/8 excerpt: 这篇文章介绍了如何在Vue框架中实现数据的高效验证与处理&#xff0c;以及如何集成ECharts、D3.js、Chart.js等图表库优化数据可视化效果。同时&#xff0c;探讨了Progressive Web App(PWA)的接入…

MySQL-相关日志

官方文档 1、MySQL支持的日志 MySQL有不同类型日志文件&#xff0c;用来存储不同类型的日志&#xff0c;分别为 二进制日志、错误日志、通用查询日志、慢查询日志、中继日志、数据定义语句日志 慢查询日志&#xff1a;记录所有执行时间超过 long_query_time的所有查询&#xf…

攻防世界---misc---Excaliflag

1、题目描述&#xff0c;下载附件是一张图片 2、用winhex分析&#xff0c;没有发现奇怪的地方 3、在kali中使用binwalk -e 命令&#xff0c;虽然分离出来了一些东西&#xff0c;但是不是有用的 4、最后用stegsolve分析&#xff0c;切换图片&#xff0c;发现有字符串&#xff0c…

Apache IoTDB 分布式架构三部曲(三)副本与共识算法

IoTDB 首创并应用的共识协议统一框架&#xff0c;为用户提供了灵活选择不同共识算法的可能性。 对于一个分布式集群而言&#xff0c;为了使得海量数据场景下集群能够横向扩展&#xff0c;集群需要按照一定的规则将全部数据分成多个子集存储在不同的节点上&#xff0c;从而能够更…

CNN简介与实现

CNN简介与实现 导语整体结构卷积层卷积填充步幅三维卷积立体化批处理 实现 池化层特点实现 CNN实现可视化总结参考文献 导语 CNN全称卷积神经网络&#xff0c;可谓声名远扬&#xff0c;被用于生活中的各个领域&#xff0c;也是最好理解的神经网络结构之一。 整体结构 相较于…

GUI编程-01

组件 窗口 弹窗 面板 文本框 列表框 按钮 图片 监听事件 鼠标 键盘事件 破解工具 Java提供了丰富的图形用户界面&#xff08;Graphics User Interface&#xff0c;GUI&#xff09;的类库&#xff0c;基于这些类库可以编写窗口程序。 Java关于图形界面的类库主要放在…

171.二叉树:二叉树的所有路径(力扣)

代码解决 /*** Definition for a binary tree node.* struct TreeNode {* int val;* TreeNode *left;* TreeNode *right;* TreeNode() : val(0), left(nullptr), right(nullptr) {}* TreeNode(int x) : val(x), left(nullptr, right(nullptr) {}* Tree…

LabVIEW伺服电机测控系统

LabVIEW伺服电机测控系统 开发了一个基于LabVIEW的伺服电机测控系统。系统主要用于精确控制电机的运动&#xff0c;以达到高效率和高精度的要求。通过使用LabVIEW软件和配套的硬件&#xff0c;开发者能够实现对伺服电机的实时监控和控制&#xff0c;进而提高整个系统的性能和可…

【Linux系统编程】进程地址空间

目录 前言 进程虚拟地址空间的引入 进程地址空间的概念 进一步理解进程地址空间 为什么需要进程地址空间&#xff1f; 系统层面理解malloc/new内存申请 前言 首先&#xff0c;在我们学习C语言的时候一定会见过如下这张图。&#xff08;没见过也没关系&#xff0c;接下来…

【JavaScript对象详解】 Day05

JavaScript对象详解 JavaScript 基础 - 第5天对象语法对象属性对象使用属性-查属性-改属性-增属性-删 &#xff08;了解&#xff09; 方法和调用遍历对象遍历数组对象null 内置对象Math属性方法生成任意范围随机数 综合案例随机点名案例猜数字游戏猜数字游戏设定次数生成随机颜…

运维 之 DNS域名解析

前言 我们每天打开的网站&#xff0c;他是如何来解析&#xff0c;并且我们怎么能得到网站的内容反馈的界面呢&#xff1f;那什么是DNS呢&#xff08;DNS&#xff08;DomainNameservice&#xff0c;域名服务&#xff0c;主要用于因特网上作为域名和IP地址相互映射&#xff09;那…

React的表单学习

react的表单的双向绑定 // userState实现计数实例 import {useState} from react// 1.声明一个react的状态 -useState// 2.核心绑定流程//1.通过value属性绑定react状态//2.绑定onChange事件&#xff0c;通过事件参数e拿到输入框最新的值&#xff0c;反向修改到react状态 func…