RocketMQ消息队列(二)—— Go语言操作RocketMQ

  上篇文章《RocketMQ消息队列(一)—— 基本概念和消息类型》记录了RocketMQ的一些基本的概念,本文主要写几个go语言操作RocketMQ的示例代码

一、发送普通消息

import ("context""fmt""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
)func main() {p, err := rocketmq.NewProducer(producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),producer.WithRetry(2), //指定重试次数)if err != nil {panic(err)}if err = p.Start(); err != nil {panic("启动producer失败")}topic := "test"// 构建一个消息message := primitive.NewMessage(topic, []byte("hello world!"))res, err := p.SendSync(context.Background(), message)if err != nil {fmt.Printf("send message error: %s\n", err)} else {fmt.Printf("send message success: result=%s\n", res.String())}if err = p.Shutdown(); err != nil {panic("关闭producer失败")}
}

二、消费消息

import ("context""fmt""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/consumer""github.com/apache/rocketmq-client-go/v2/primitive""time"
)func main() {c, err := rocketmq.NewPushConsumer(consumer.WithGroupName("test"),consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),)if err != nil {panic(err)}if err := c.Subscribe("test",consumer.MessageSelector{},// 收到消息后的回调函数func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {for i := range msgs {fmt.Printf("获取到值: %v \n", msgs[i])}return consumer.ConsumeSuccess, nil}); err != nil {}err = c.Start()if err != nil {panic("启动consumer失败")}//不能让主goroutine退出time.Sleep(time.Hour)_ = c.Shutdown()
}

三、发送延迟消息

延迟消息和普通的发送区别就是在需要发送的消息上,用下面的代码设置发送的级别即可

message.WithDelayTimeLevel(3)

全部代码如下:


import ("context""fmt""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
)func main() {p, err := rocketmq.NewProducer(producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),producer.WithRetry(2), //指定重试次数)if err != nil {panic(err)}if err = p.Start(); err != nil {panic("启动producer失败")}topic := "test"// 构建一个消息message := primitive.NewMessage(topic, []byte("this is a delay message!"))// 给message设置延迟级别message.WithDelayTimeLevel(3)res, err := p.SendSync(context.Background(), message)if err != nil {fmt.Printf("send message error: %s\n", err)} else {fmt.Printf("send message success: result=%s\n", res.String())}if err = p.Shutdown(); err != nil {panic("关闭producer失败")}
}

四、发送事务消息

发送事务消息需要我们写一个TransactionListener接口的方法,指明事务执行成功和回调的具体操作,接口如下

type TransactionListener interface {//  When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.ExecuteLocalTransaction(*Message) LocalTransactionState// When no response to prepare(half) message. broker will send check message to check the transaction status, and this// method will be invoked to get local transaction status.CheckLocalTransaction(*MessageExt) LocalTransactionState
}

完整的代码如下:

import ("context""fmt""os""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
)type DemoListener struct {
}func (dl *DemoListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {fmt.Println("开始执行本地逻辑")time.Sleep(time.Second * 3)fmt.Println("执行本地逻辑失败")//本地执行逻辑无缘无故失败 代码异常 宕机return primitive.UnknowState
}func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {fmt.Println("rocketmq的消息回查")return primitive.CommitMessageState
}func main() {p, _ := rocketmq.NewTransactionProducer(&DemoListener{},producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),producer.WithRetry(1),)err := p.Start()if err != nil {fmt.Printf("start producer error: %s\n", err.Error())os.Exit(1)}res, err := p.SendMessageInTransaction(context.Background(),primitive.NewMessage("TopicTest5", []byte("Hello RocketMQ again ")))if err != nil {fmt.Printf("send message error: %s\n", err)} else {fmt.Printf("send message success: result=%s\n", res.String())}time.Sleep(5 * time.Minute)err = p.Shutdown()if err != nil {fmt.Printf("shutdown producer error: %s", err.Error())}
}

后记
  个人总结,欢迎转载、评论、批评指正

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/673018.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

jvm垃圾收集器特性描述

在Java虚拟机(JVM)中,垃圾回收器(Garbage Collector, GC)是自动管理内存的重要组成部分,其主要职责是识别和删除不再被使用的对象,以释放和回收内存资源。随着技术的发展,为了适应不…

《合成孔径雷达成像算法与实现》Figure6.10

clc clear close all参数设置 距离向参数设置 R_eta_c 20e3; % 景中心斜距 Tr 2.5e-6; % 发射脉冲时宽 Kr 20e12; % 距离向调频率 alpha_os_r 1.2; % 距离过采样率 Nrg 320; % 距离线采样数 距离向…

水题中的稀奇古怪trick合集

状态转移问题,一个状态的改变还会牵涉到此状态之前的状态时,很难利用简单的动态规划解决,可以考虑利用BFS队列优化,把更新过的状态存进队列中,队列空时停止 例题:2024牛客寒假集训2D-Tokitsukaze and Slash…

数据结构之归并排序

所谓“归并”,是将两个或两个以上的有序文件合并成为一个新的有序文件。归并排序的一种实现方法是把一个有n个记录的无序文件看成是由n个长度为1的有序子文件组成的文件,然后进行两两归并,得到[ n 2 \frac n2 2n​]个长度为2或1的有序文件&am…

UUID和雪花(Snowflake)算法该如何选择?

UUID和雪花(Snowflake)算法该如何选择? UUID 和 Snowflake 都可以生成唯一标识,在分布式系统中可以说是必备利器,那么我们该如何对不同的场景进行不同算法的选择呢,UUID 简单无序十分适合生成 requestID, Snowflake 里…

Flink实战六_直播礼物统计

接上文:Flink实战五_状态机制 1、需求背景 现在网络直播平台非常火爆,在斗鱼这样的网络直播间,经常可以看到这样的总榜排名,体现了主播的人气值。 人气值计算规则:用户发送1条弹幕互动,赠送1个荧光棒免费…

c++学习:基本变量类型+宽字符用法

数据类型 描述 大小(通常情况 下) 用途 int 整型 至少 16 位 存储整数 short int 短整型 至少 16 位 存储较…

λ-矩阵知识点

原文:链接 λ-矩阵 若矩阵 A \mathbf{A} A 的元素为关于 λ λ λ 的多项式,则称 A \mathbf{A} A 为 λ λ λ-矩阵 (表示为 A ( λ ) \mathbf{A}(λ) A(λ)). λ λ λ-矩阵也存在秩、逆、初等变换、相抵的概念, 但是有一些不同. 定义. λ λ λ-矩阵的秩是…

07-使用Package、Crates、Modules管理项目

上一篇:06-枚举和模式匹配 当你编写大型程序时,组织代码将变得越来越重要。通过对相关功能进行分组并将具有不同功能的代码分开,您可以明确在哪里可以找到实现特定功能的代码,以及在哪里可以改变功能的工作方式。 到目前为止&…

必收藏!第六版CCF推荐会议C类国际学术会议!(中国计算机学会)

中国计算机学会 中国计算机学会(CCF)是全国性、学术性、非营利的学术团体,由从事计算机及相关科学技术领域的个人和单位自愿组成。作为独立社团法人,CCF是中国科学技术协会的成员之一,是全国一级学会! CCF的…

JavaScript基础第四天

JavaScript 基础第四天 今天我们学习js的函数,包括普通函数、匿名函数、箭头函数以及函数作用域。 1. 函数的初体验 1.1. 什么是函数 函数是 JavaScript 中的基本组件之一。一个函数是 JavaScript 过程一组执行任务或计算值的语句。要使用一个函数,你…

Linux下库函数、静态库与动态库

库函数 什么是库 库是二进制文件, 是源代码文件的另一种表现形式, 是加了密的源代码; 是一些功能相近或者是相似的函数的集合体. 使用库有什么好处 提高代码的可重用性, 而且还可以提高程序的健壮性;可以减少开发者的代码开发量, 缩短开发周期. 库制作完成后, 如何给用户…

大模型为什么会有 tokens 限制?

人是以字数来计算文本长度,大语言模型 (LLM)是以 token 数来计算长度的。LLM 使用 token 把一个句子分解成若干部分。 token 可以是一个单词、一个单词中的一个部分、甚至是一个字符,具体取决于它使用的标记化方法 (tokenization…

为电子表格嵌入数据库,Excel/WPS一键升级为管理系统

将Excel表格转化为管理系统,这款工具能够实现只需导入表格数据,即可自动生成相应的软件和APP。 表格办公的烦恼,有遇到吧? 对于具有一定规模的企业而言,各类表格如同繁星般众多,既有日常使用的常规表格&a…

泰克示波器——TBS2000系列界面整体介绍

目录 1.1 通道区域面板标识1.2 示波器测试输出(检测探针与设置的好坏)1.3 面板其他快捷按钮1.4 波器整体界面 1.1 通道区域面板标识 在通道面板的下方标识有示波器的通道属性以及参数值,如我使用的型号为“TBS2104X”的示波器,面…

格子表单GRID-FORM | 文档网站搭建(VitePress)与部署(Github Pages)

格子表单/GRID-FORM已在Github 开源,如能帮到您麻烦给个星🤝 GRID-FORM 系列文章 基于 VUE3 可视化低代码表单设计器嵌套表单与自定义脚本交互文档网站搭建(VitePress)与部署(Github Pages) 效果预览 格…

如何使用VMware分享出来的虚拟机系统(OVF文件)

前言 这几天看到很多小伙伴都在安装虚拟机,但成不成就不知道了。 所以小白准备把自己安装完成的系统打包分享给小伙伴。 如果你需要已经安装完成的虚拟系统,可以获取哦!打开即用! 虚拟机系统包括: Win10 专业版 Wi…

关于git经常会碰到的几个场景分析

1.把其他远程分支代码同步合并到本地分支,比如:git将另一个远程分支origin/develop代码合入我现在的本地分支B,本地分支对应的远程分支是origin/b。 要将远程分支 origin/develop 的代码合并到本地分支 B(对应的远程分支是 origin/B&#xf…

anaconda+pytorch+pycharm安装总结

1.下载最新的Anaconda,目前是python3.11适用 anaconda官网 安装教程 卸载并重新安装的教程 (如果找不到火绒清理注册表垃圾的位置可以拉到文章底部查看) 2.pytorch安装,注意python版本、cuda版本和pytorch版本的适配 安装教程 3.pycharm安装和…

C++: 模板初阶

目录 引子&#xff1a; 函数模板 格式&#xff1a; 例子&#xff1a; 函数模板的实例化&#xff1a; 隐形实例化&#xff1a;让编译器根据实参推演模板参数的实际类型 显式实例化&#xff1a; 在函数名后的<>中指定模板参数的实际类型 模板参数的匹配原则 类模…