RocketMQ 5.x如何使用GRPC方式发送消费消息

这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

RocketMQ版本

  • 5.1.0

背景

我们都知道RocketMQ 5.x新增了proxy模式部署方式,也就是支持了GRPC的消费方式消费,所以今天我们来试试

本次使用的开发语言是goland

前置条件

这里默认我们已经部署了RocketMQ proxy,如果不会部署的可以参考我之前的文章

依赖管理

本次使用的依赖管理方式是go.mod
使用的goland sdk是github.com/apache/rocketmq-clients/golang

也就是这个开源项目

我们直接执行

go get github.com/apache/rocketmq-clients/golang@master

master分支作为我们的依赖

发送消息

package mainimport ("context""fmt""log""os""strconv""time"rmq_client "github.com/apache/rocketmq-clients/golang""github.com/apache/rocketmq-clients/golang/credentials"
)const (Topic     = "xiao-zou-topic"Endpoint  = "127.0.0.1:8081"AccessKey = "xxxxxx"SecretKey = "xxxxxx"
)func main() {os.Setenv("mq.consoleAppender.enabled", "true")rmq_client.ResetLogger()// new producer instanceproducer, err := rmq_client.NewProducer(&rmq_client.Config{Endpoint: Endpoint,Credentials: &credentials.SessionCredentials{AccessKey:    AccessKey,AccessSecret: SecretKey,},},rmq_client.WithTopics(Topic),)if err != nil {log.Fatal(err)}// start producererr = producer.Start()if err != nil {log.Fatal(err)}// graceful stop producerdefer producer.GracefulStop()for i := 0; i < 10; i++ {// new a messagemsg := &rmq_client.Message{Topic: Topic,Body:  []byte("this is a message : " + strconv.Itoa(i)),}// set keys and tagmsg.SetKeys("a", "b")msg.SetTag("ab")// send message in syncresp, err := producer.Send(context.TODO(), msg)if err != nil {log.Fatal(err)}for i := 0; i < len(resp); i++ {fmt.Printf("%#v\n", resp[i])}// wait a momenttime.Sleep(time.Second * 1)}
}

我们可以直接运行,然后看到消息发送成功了

消息消费

package mainimport ("context""fmt""log""os""time"rmq_client "github.com/apache/rocketmq-clients/golang""github.com/apache/rocketmq-clients/golang/credentials"
)const (Topic         = "xiao-zou-topic"ConsumerGroup = "gid-xiaozou-grpc"Endpoint      = "127.0.0.1:8081"AccessKey     = "xxxxxx"SecretKey     = "xxxxxx"
)var (// maximum waiting time for receive funcawaitDuration = time.Second * 5// maximum number of messages received at one timemaxMessageNum int32 = 16// invisibleDuration should > 20sinvisibleDuration = time.Second * 20// receive messages in a loop
)func main() {// log to consoleos.Setenv("mq.consoleAppender.enabled", "true")rmq_client.ResetLogger()// new simpleConsumer instancesimpleConsumer, err := rmq_client.NewSimpleConsumer(&rmq_client.Config{Endpoint:      Endpoint,ConsumerGroup: ConsumerGroup,Credentials: &credentials.SessionCredentials{AccessKey:    AccessKey,AccessSecret: SecretKey,},},rmq_client.WithAwaitDuration(awaitDuration),rmq_client.WithSubscriptionExpressions(map[string]*rmq_client.FilterExpression{Topic: rmq_client.SUB_ALL,}),)if err != nil {log.Fatal(err)}// start simpleConsumererr = simpleConsumer.Start()if err != nil {log.Fatal(err)}// graceful stop simpleConsumerdefer simpleConsumer.GracefulStop()go func() {for {fmt.Println("start receive message")mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)if err != nil {fmt.Println(err)}// ack messagefor _, mv := range mvs {simpleConsumer.Ack(context.TODO(), mv)msg := string(mv.GetBody())fmt.Println(msg)}fmt.Println("wait a moment")fmt.Println()time.Sleep(time.Second * 3)}}()select {}
}

执行结果:

源码

相关源码已上传到github,需要可以自取

https://github.com/weihubeats/java-to-go-learning/tree/main/student/rocketmq-demo

总结

可以看到我们使用GRPC的方式消费和发送消息都成功了,但是需要注意的是目前rocketmq-clients还不是很稳定,有一些bug,生产使用还是需要谨慎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/30341.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SAN共享存储架构

SAN共享存储架构 概述 近年在高性能专用存储网络需求的驱使下&#xff0c;许多SAN存储系统应用于高性能计算网络系统、大型网站系统、非线性编辑系统等网络系统中&#xff0c;存储设备与计算机主机系统之间一对一的关系&#xff0c;被可供多个计算机主机共享读写同一个存储设…

解决Consider the following: If you want an embedded database (H2, HSQL or Derby)

问题描述&#xff1a; 2023-08-10 11:52:32.992 ERROR 13064 --- [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPLICATION FAILED TO START ***************************Description:Failed to configure a DataSource: url …

探究Vue源码:mustache模板引擎(10) 解决不能用连续点符号找到多层对象问题,为编译循环结构做铺垫

上文 探究Vue源码:mustache模板引擎(9) 将单层无喜欢结果tokens转为dom字符串 我们简单处理了 token转字符串的业务逻辑 但是 我们只处理了最贱的花括号 接下来 带着大家将井号的也处理一下 我们打开项目 将 www中的index.html代码改回之前的这样 <!DOCTYPE html> <h…

通过PMP考试的伙伴看过来!免试多拿一个证书!

有PMP电子证书或纸质证书的伙伴可以免考申领国家CSPM二级证书&#xff01;&#xff08;项目管理专业人员评价国标证书&#xff09;&#xff01;免试&#xff0c;多拿一个证书&#xff0c;真香&#xff01; 本周已经开始提交新一批名单! 现在持有PMP证书可以免培训、免考试申报…

改进的麻雀算法优化最大相关峭度解卷积(SCSSA-MCKD),实现早期微弱故障诊断,MATLAB代码实现

01 引言 由于一些设备的早期故障产生的冲击十分微弱&#xff0c;易被系统噪声干扰&#xff0c;如何有效地对设备的原始故障信号进行降噪并增强信号中微弱冲击成分&#xff0c;是进行该类部件早期故障诊断的关键。 最大相关峭度解卷积&#xff08;MCKD&#xff09;通过解卷积运算…

【UE】VS无法调试,不能进入断点、未命中断点、断点不可用解决办法

问题&#xff1a;通过 附加进程的方式 调试DS&#xff0c;部分代码可以打断点&#xff0c;部分无法打断点 原因&#xff1a;XP限制一次加载的dll符号不能超过500个 解决&#xff1a; WinR 打开regedit在 HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Control\Session Manag…

【HTML】label 标签

在HTML中&#xff0c;<label> 标签用于为表单元素创建标签文本或标题。它可以与输入字段&#xff08;如文本框、单选按钮、复选框等&#xff09;和其他表单元素关联起来&#xff0c;以提高可用性和可访问性。 <label> 元素有两种常见的用法&#xff1a; 包裹方式…

【VUE 监听用户滑动】

监听滑动方法 一. touchstart、touchmove、touchend二.v-touch三. 自定义指令 一. touchstart、touchmove、touchend 在 Vue 中监听用户往哪个方向滑动可以通过添加事件监听器&#xff0c;然后在事件回调函数中判断滑动方向。常用的事件监听器有touchstart、touchmove、touche…

leetcode 399-除法求值

法一&#xff1a;并查集 分析示例1&#xff1a; a / b 2.0 a/ b 2.0 a/b2.0&#xff0c;说明 a 2 b a2b a2b&#xff0c; a a a和 b b b在同一个集合中 b / c 3.0 b/c3.0 b/c3.0&#xff0c;说明 b 3 c b3c b3c&#xff0c; b b b和 c c c在同一个集合中 求 a / c a/…

24届近5年重庆邮电大学自动化考研院校分析

今天给大家带来的是重庆邮电大学控制考研分析 满满干货&#xff5e;还不快快点赞收藏 一、重庆邮电大学 学校简介 重庆邮电大学简称"重邮"&#xff0c;坐落于直辖市-重庆市&#xff0c;入选国家"中西部高校基础能力建设工程”、国家“卓越工程师教育培养计划…

c51单片机16个按键密码锁源代码(富proteus电路图)

注意了&#xff1a;这个代码你是没法直接运行的&#xff0c;但是如果你看得懂&#xff0c;随便改一改不超过1分钟就可以用 #include "reg51.h" #include "myheader.h" void displayNumber(unsigned char num) {if(num1){P10XFF;P10P11P14P15P160;}else if…

PyCharm新手入门指南

安装好Pycharm后&#xff0c;就可以开始编写第一个函数&#xff1a;Hello World啦~我们就先来学习一些基本的操作&#xff0c;主要包含新建Python文件&#xff0c;运行代码&#xff0c;查看结果等等。 文章主要包含五个部分&#xff1a; 一、界面介绍 主要分为菜单栏、项目目录…

osi模型

OSI 模型&#xff08;Open Systems Interconnection model&#xff09;是一个用于计算机网络体系结构的参考模型&#xff0c;由国际标准化组织&#xff08;ISO&#xff09;在 1984 年制定&#xff0c;旨在定义不同层次上的通信协议&#xff0c;以促进不同厂商的设备在网络上进行…

JQuery——动画效果

jQuery 提供了多种动画效果&#xff0c;可以让你在网页中添加平滑的过渡和动态效果。以下是一些常见的 jQuery 动画效果及其用法&#xff1a; 1. 隐藏和显示&#xff1a; 通过调用 .hide() 和 .show() 方法可以实现元素的渐隐和渐现效果。 $(#myElement).hide(); // 隐藏元素…

开发工具Eclipse的使用之导入项目(import)

&#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 接下来看看由辉辉所写的关于Eclipse使用的相关操作吧 目录 &#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 一.导读 二.详细操作步骤 1.右击项…

吐血整理,Python接口自动化测试-接口关联依赖处理(详细)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 场景说明 在面试…

cpolar的基础使用方法

如何使用cpolar内网穿透&#xff1f; 文章目录 如何使用cpolar内网穿透&#xff1f;前言1. 在群辉NAS系统下安装cpolar套件2. 管理隧道列表3. 创建固定数据隧道 前言 群晖作为大容量存储系统&#xff0c;既可以作为个人的私有存储设备&#xff0c;也可以放在小型企业中作为数据…

创建两个线程,其中一个线程读取文件中的数据,另外一个线程将读取到的内容打印到终端上,类似实现cat一个文件。 cat数据完毕后,要结束两个线程。

#include <stdio.h> #include <pthread.h>#define BUFFER_SIZE 99999 //足够大// 全局共享的数据缓冲区 char buffer[BUFFER_SIZE]; int buffer_length 0;// 锁和条件变量用于线程同步 pthread_mutex_t mutex PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond …

vue3 Hooks 封装loading使用

vue3 Hooks 封装loading使用 个人理解&#xff1a;Hooks 就是 钩子 的意思&#xff0c;在特定时机执行的函数 之前不理解Hooks和自定义封装的utils函数有什么区别&#xff0c;它们都是函数&#xff0c;逐步理解到utils函数没有vue里面的响应式api&#xff0c;而自定义Hooks可…

基于微信小程序的传染病酒店隔离平台设计与实现(Java+spring boot+MySQL+微信小程序)

获取源码或者论文请私信博主 演示视频&#xff1a; 基于微信小程序的传染病酒店隔离平台设计与实现&#xff08;Javaspring bootMySQL微信小程序&#xff09; 使用技术&#xff1a; 前端&#xff1a;html css javascript jQuery ajax thymeleaf 微信小程序 后端&#xff1a;…