RocketMQ 的搭建
1 ) 配置 docker-compose.yaml 文件
version: '3.5'
services:rmqnamesrv:image: foxiswho/rocketmq:servercontainer_name: rmqnamesrvports:- 9876:9876volumes:- ./logs:/opt/logs- ./store:/opt/storenetworks:rmq:aliases:- rmqnamesrvrmqbroker:image: foxiswho/rocketmq:brokercontainer_name: rmqbrokerports:- "10909:10909"- "10911:10911"volumes:- ./logs:/opt/logs- ./store:/opt/store- ./conf/broker.conf:/etc/rocketmq/broker.confenvironment:NAMESRV_ADDR: "rmqnamesrv:9876"JAVA_OPTS: " -Duser.home=/opt"JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"command: mqbroker -c /etc/rocketmq/broker.confdepends_on:- rmqnamesrvnetworks:rmq:aliases:- rmqbrokerrmqconsole:image: styletang/rocketmq-console-ngcontainer_name: rmqconsoleports:- "8080:8080"environment:JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"depends_on:- rmqnamesrvnetworks:rmq:aliases:- rmqconsolenetworks:rmq:name: rmqdriver: bridge
2 ) 配置文件 conf/broker.conf
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
brokerIP1=192.168.124.6
defaultTopicQueueNums = 4
autoCreateTopicEnable = true
autoCreateSubscriptionGroup = true
listenPort = 10911
deleteWhen = 04
fileReservedTime = 120
mapedfileSizeCommitLog = 1073741824
mapedfileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio = 88
maxMessageSize=65536
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
- 注意,需要指定
brokerIP1
且不能使用 0.0.0.0 也不能不指定,否则无法通信 fileReservedTime
默认是 48h
3 ) 拉取镜像
- $
docker pull foxiswho/rocketmq:server
- $
docker pull foxiswho/rocketmq:broker
- $
docker pull styletang/rocketmq-console-ng
4 )启动和检查
- 启动 $
docker compose up -d
- 检查状态 $
docker compose ps
打开 UI 界面验证
- 访问:http://127.0.0.1:8080
上面这个就是和上面的 brokerIP1
对应
编写程序验证生产和消费消息
- 现在简述下场景
- 生产5条消息
- 10s 后进行消费
代码实现
package mainimport ("context""fmt""log""os""strconv""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/consumer""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
)const groupName = "BBS_SHOP_GROUP_123"func GetMqAddr() string {mqAddr := "127.0.0.1:9876" // 这里填入你的 NameServer 端口return mqAddr
}func ProduceMsg(mqAddr string, topic string) {p, err := rocketmq.NewProducer( // 普通消息生产者producer.WithGroupName(groupName),producer.WithNsResolver(primitive.NewPassthroughResolver([]string{mqAddr})),producer.WithRetry(2),)if err != nil {panic(err)}err = p.Start()if err != nil {log.Fatal()fmt.Println("生产者错误: %v", err.Error())os.Exit(1)}for i := 0; i < 5; i++ {msg := &primitive.Message{Topic: topic,Body: []byte("Hello XProjectOrder " + strconv.Itoa(i)),}msg.WithDelayTimeLevel(3)r, err := p.SendSync(context.Background(), msg)if err != nil {fmt.Println("发送消息错误: %v", err.Error())} else {fmt.Println("生产消息成功: " + r.String() + "-" + r.MsgID)}}err = p.Shutdown()if err != nil {fmt.Println("生产者shutdown: %v", err.Error())os.Exit(1)}
}func ComsumeMsg(mqAddr string, topic string) {c, err := rocketmq.NewPushConsumer(consumer.WithGroupName(groupName),consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{mqAddr})),)if err != nil {panic(err)}err = c.Subscribe(topic, consumer.MessageSelector{},func(ctx context.Context, msgList ...*primitive.MessageExt) (consumer.ConsumeResult, error) {for i := range msgList {fmt.Printf("订阅消息,消费%v \n", msgList[i])}return consumer.ConsumeSuccess, nil})if err != nil {fmt.Println("消费消息错误: %v", err.Error())}err = c.Start()if err != nil {fmt.Println("开启消费这错误: %v", err.Error())}time.Sleep(time.Hour)err = c.Shutdown()if err != nil {fmt.Println("shutdown消费者错误: %v", err.Error())}
}func main() {topic := "BBS_SHOP_TOPIC_123"mqAddr := GetMqAddr()ProduceMsg(mqAddr, topic)ComsumeMsg(mqAddr, topic)
}
-
以上是一个 demo,在真实场景,自行进行封装处理
-
这里定义了一个 主题
BBS_SHOP_TOPIC_123
和 一个订阅组BBS_SHOP_GROUP_123
-
查看生产消息输出
生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80001, offsetMsgId=C0A87C0600002A9F0000000000000000, queueOffset=0, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=1]]-C0A87C06209F0000000017eef7e80001 生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80002, offsetMsgId=C0A87C0600002A9F00000000000000DE, queueOffset=1, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=2]]-C0A87C06209F0000000017eef7e80002 生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80003, offsetMsgId=C0A87C0600002A9F00000000000001BC, queueOffset=2, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=3]]-C0A87C06209F0000000017eef7e80003 生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80004, offsetMsgId=C0A87C0600002A9F000000000000029A, queueOffset=3, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=0]]-C0A87C06209F0000000017eef7e80004 生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80005, offsetMsgId=C0A87C0600002A9F0000000000000378, queueOffset=4, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=1]]-C0A87C06209F0000000017eef7e80005
-
查看消费 (10s 之后)
订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 0, Flag=0, properties=map[CONSUME_START_TIME:1717572747702 DELAY:3 MAX_OFFSET:1 MIN_OFFSET:0 REAL_QID:1 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80001], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80001, OffsetMsgId=C0A87C0600002A9F0000000000000456,QueueId=1, StoreSize=221, QueueOffset=0, SysFlag=0, BornTimestamp=1717572737685, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747694, StoreHost=192.168.124.6:10911, CommitLogOffset=1110, BodyCRC=407480418, ReconsumeTimes=0, PreparedTransactionOffset=0] 订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 1, Flag=0, properties=map[CONSUME_START_TIME:1717572747706 DELAY:3 MAX_OFFSET:1 MIN_OFFSET:0 REAL_QID:2 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80002], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80002, OffsetMsgId=C0A87C0600002A9F0000000000000533,QueueId=2, StoreSize=221, QueueOffset=0, SysFlag=0, BornTimestamp=1717572737698, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747701, StoreHost=192.168.124.6:10911, CommitLogOffset=1331, BodyCRC=1867421940, ReconsumeTimes=0, PreparedTransactionOffset=0] 订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 2, Flag=0, properties=map[CONSUME_START_TIME:1717572747710 DELAY:3 MAX_OFFSET:1 MIN_OFFSET:0 REAL_QID:3 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80003], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80003, OffsetMsgId=C0A87C0600002A9F0000000000000610,QueueId=3, StoreSize=221, QueueOffset=0, SysFlag=0, BornTimestamp=1717572737702, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747704, StoreHost=192.168.124.6:10911, CommitLogOffset=1552, BodyCRC=1984416078, ReconsumeTimes=0, PreparedTransactionOffset=0] INFO[0010] update offset to broker success MessageQueue="MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=0]" consumerGroup=BBS_SHOP_GROUP_123 offset=0 订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 3, Flag=0, properties=map[CONSUME_START_TIME:1717572747713 DELAY:3 MAX_OFFSET:1 MIN_OFFSET:0 REAL_QID:0 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80004], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80004, OffsetMsgId=C0A87C0600002A9F00000000000006ED,QueueId=0, StoreSize=221, QueueOffset=0, SysFlag=0, BornTimestamp=1717572737706, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747709, StoreHost=192.168.124.6:10911, CommitLogOffset=1773, BodyCRC=21035480, ReconsumeTimes=0, PreparedTransactionOffset=0] INFO[0010] update offset to broker success MessageQueue="MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=1]" consumerGroup=BBS_SHOP_GROUP_123 offset=1 INFO[0010] update offset to broker success MessageQueue="MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=2]" consumerGroup=BBS_SHOP_GROUP_123 offset=1 INFO[0010] update offset to broker success MessageQueue="MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=3]" consumerGroup=BBS_SHOP_GROUP_123 offset=1 订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 4, Flag=0, properties=map[CONSUME_START_TIME:1717572747717 DELAY:3 MAX_OFFSET:2 MIN_OFFSET:0 REAL_QID:1 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80005], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80005, OffsetMsgId=C0A87C0600002A9F00000000000007CA,QueueId=1, StoreSize=221, QueueOffset=1, SysFlag=0, BornTimestamp=1717572737709, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747711, StoreHost=192.168.124.6:10911, CommitLogOffset=1994, BodyCRC=522480763, ReconsumeTimes=0, PreparedTransactionOffset=0]
-
以上就是生产和消费的主要过程
效果
总结
- 以上是简单的环境搭建和生产消息,以及延迟消费消息的 demo 示例
- 实际场景中,结合以上demo,对一些异步发送消息的场景进行灵活运用和升级