这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
RocketMQ版本
- 5.1.0
背景
我们都知道RocketMQ
5.x
新增了proxy
模式部署方式,也就是支持了GRPC
的消费方式消费,所以今天我们来试试
本次使用的开发语言是goland
前置条件
这里默认我们已经部署了RocketMQ
proxy
,如果不会部署的可以参考我之前的文章
依赖管理
本次使用的依赖管理方式是go.mod
使用的goland
sdk是github.com/apache/rocketmq-clients/golang
也就是这个开源项目
我们直接执行
go get github.com/apache/rocketmq-clients/golang@master
以master
分支作为我们的依赖
发送消息
package mainimport ("context""fmt""log""os""strconv""time"rmq_client "github.com/apache/rocketmq-clients/golang""github.com/apache/rocketmq-clients/golang/credentials"
)const (Topic = "xiao-zou-topic"Endpoint = "127.0.0.1:8081"AccessKey = "xxxxxx"SecretKey = "xxxxxx"
)func main() {os.Setenv("mq.consoleAppender.enabled", "true")rmq_client.ResetLogger()// new producer instanceproducer, err := rmq_client.NewProducer(&rmq_client.Config{Endpoint: Endpoint,Credentials: &credentials.SessionCredentials{AccessKey: AccessKey,AccessSecret: SecretKey,},},rmq_client.WithTopics(Topic),)if err != nil {log.Fatal(err)}// start producererr = producer.Start()if err != nil {log.Fatal(err)}// graceful stop producerdefer producer.GracefulStop()for i := 0; i < 10; i++ {// new a messagemsg := &rmq_client.Message{Topic: Topic,Body: []byte("this is a message : " + strconv.Itoa(i)),}// set keys and tagmsg.SetKeys("a", "b")msg.SetTag("ab")// send message in syncresp, err := producer.Send(context.TODO(), msg)if err != nil {log.Fatal(err)}for i := 0; i < len(resp); i++ {fmt.Printf("%#v\n", resp[i])}// wait a momenttime.Sleep(time.Second * 1)}
}
我们可以直接运行,然后看到消息发送成功了
消息消费
package mainimport ("context""fmt""log""os""time"rmq_client "github.com/apache/rocketmq-clients/golang""github.com/apache/rocketmq-clients/golang/credentials"
)const (Topic = "xiao-zou-topic"ConsumerGroup = "gid-xiaozou-grpc"Endpoint = "127.0.0.1:8081"AccessKey = "xxxxxx"SecretKey = "xxxxxx"
)var (// maximum waiting time for receive funcawaitDuration = time.Second * 5// maximum number of messages received at one timemaxMessageNum int32 = 16// invisibleDuration should > 20sinvisibleDuration = time.Second * 20// receive messages in a loop
)func main() {// log to consoleos.Setenv("mq.consoleAppender.enabled", "true")rmq_client.ResetLogger()// new simpleConsumer instancesimpleConsumer, err := rmq_client.NewSimpleConsumer(&rmq_client.Config{Endpoint: Endpoint,ConsumerGroup: ConsumerGroup,Credentials: &credentials.SessionCredentials{AccessKey: AccessKey,AccessSecret: SecretKey,},},rmq_client.WithAwaitDuration(awaitDuration),rmq_client.WithSubscriptionExpressions(map[string]*rmq_client.FilterExpression{Topic: rmq_client.SUB_ALL,}),)if err != nil {log.Fatal(err)}// start simpleConsumererr = simpleConsumer.Start()if err != nil {log.Fatal(err)}// graceful stop simpleConsumerdefer simpleConsumer.GracefulStop()go func() {for {fmt.Println("start receive message")mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)if err != nil {fmt.Println(err)}// ack messagefor _, mv := range mvs {simpleConsumer.Ack(context.TODO(), mv)msg := string(mv.GetBody())fmt.Println(msg)}fmt.Println("wait a moment")fmt.Println()time.Sleep(time.Second * 3)}}()select {}
}
执行结果:
源码
相关源码已上传到github,需要可以自取
https://github.com/weihubeats/java-to-go-learning/tree/main/student/rocketmq-demo
总结
可以看到我们使用GRPC
的方式消费和发送消息都成功了,但是需要注意的是目前rocketmq-clients
还不是很稳定,有一些bug,生产使用还是需要谨慎