Go微服务: 分布式之发送带有事务消息的示例

分布式之发送带有事务消息

  • 现在做一个RocketMQ的事务消息的 demo

1 )生产者

package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
)// 自定义结构体,为了实现 NewTransactionProducer 第一个参数的接口
type MyListener struct{}func (hl MyListener) ExecuteLocalTransaction(*primitive.Message) primitive.LocalTransactionState {return primitive.CommitMessageState
}func (hl MyListener) CheckLocalTransaction(*primitive.MessageExt) primitive.LocalTransactionState {return primitive.CommitMessageState
}func main() {// ------------ 1. 连接RocketMQ ------------------------mqAddr := "127.0.0.1:9876" // 模拟地址// NewTransactionProducer 这个方法第一个参数是一个 Listener// 是一个接口,需要一个接口体去实现它的方法p, err := rocketmq.NewTransactionProducer( // 开启事物消息生产者MyListener{},producer.WithNameServer([]string{mqAddr}),)if err != nil {panic(err) // 生产环境禁用panic}// ------------ 2. 启动RocketMQ ------------------------err = p.Start()if err != nil {panic(err)}// ------------ 3. 发送RocketMQ 消息 ------------------------res, err := p.SendMessageInTransaction(context.Background(),primitive.NewMessage("MyTransactionTopic", []byte("xxxxxxxxxxxyyyyyyyyyyyyyzzzzzzzzzzzz")),)fmt.Println(res.Status)if err != nil {panic(err)}fmt.Printf("发送成功")time.Sleep(time.Second * 3600)err = p.Shutdown()if err != nil {panic(err)}
}
  • 可见, primitive.LocalTransactionState 是返回值
  • 进入这个包中,它有三个状态
    • CommitMessageState 提交状态
    • RollbackMessageState 回滚状态
    • UnknowState 未知状态
  • 在后续,这三个状态都可以再试一试

2 ) 运行后,在UI界面查看消息


2.1 输出信息如下

INFO[0000] change the route for clients                 
INFO[0000] the topic route info changed                  changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-bbs\",\"readQueueNums\":4,\"writeQueueNums\":4,\"perm\":6,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"broker-bbs\",\"brokerAddrs\":{\"0\":\"192.168.124.6:10911\"}}]}" changedFrom="<nil>" topic=MyTransactionTopic
0
发送成功

2.2 运行效果

2 )消费者

package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/consumer""github.com/apache/rocketmq-client-go/v2/primitive"
)func main() {mqAddr := "127.0.0.1:9876"topic := "MyTransactionTopic"groupName := "ddddddd"c, err := rocketmq.NewPushConsumer(consumer.WithGroupName(groupName),consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{mqAddr})),)if err != nil {panic(err)}err = c.Subscribe(topic, consumer.MessageSelector{},func(ctx context.Context, msgList ...*primitive.MessageExt) (consumer.ConsumeResult, error) {for i := range msgList {fmt.Printf("订阅消息,消费%v \n", msgList[i])}return consumer.ConsumeSuccess, nil})if err != nil {fmt.Println("消费消息错误: %v", err.Error())}err = c.Start()if err != nil {fmt.Println("开启消费这错误: %v", err.Error())}time.Sleep(time.Hour)err = c.Shutdown()if err != nil {fmt.Println("shutdown消费者错误: %v", err.Error())}
}
  • 在RocketMQ中,事务消息的处理机制涉及到生产者和消费者两端的协作,但与普通消息消费模式有所区别

  • 事务消息的消费端并不直接参与到事务的两阶段提交过程中,它更像是一个“半事务消息”的确认者

  • 具体流程如下:

    • 生产者发送事务消息:生产者发送一条半事务消息到MQ服务器,并立即返回,此时消息处于“Prepare”状态
    • MQ Server回调生产者确认:MQ服务器会回调生产者提供的事务监听器(在Go示例中是HappyListener),执行本地事务。生产者需在此阶段执行事务操作并决定是提交还是回滚该消息
    • 生产者根据本地事务结果告知MQ Server:生产者根据本地事务执行结果,通过事务状态检查接口告诉MQ服务器是提交还是回滚这条半事务消息。
    • 消息变为可消费状态:MQ服务器根据生产者的决定,将消息标记为Commit或Rollback,Commit后的消息才对普通消费者可见
  • 因此,对于事务消息的消费者来说,其主要职责是消费那些已经被事务提交成功的消息,而不需要直接参与事务的提交或回滚过程

  • 消费者代码看起来与普通消息的消费者相似,但消费的消息实际上是生产者已经提交成功的事务消息

  • 不过,如果您的需求是希望消费者也以某种形式参与到事务的最终确认中,比如基于消息的消费结果来决定是否提交事务,这在RocketMQ的标准事务消息模型中并不直接支持

  • RocketMQ的事务模型主要关注于保证消息生产和本地事务的原子性,消费者更多的是作为事务结果的后续处理者角色

  • 在提供的消费者示例中,尽管它看起来是一个普通的消费者,但实际上它处理的是生产者通过事务消息流程提交后的内容,这符合事务消息的消费逻辑

  • 如果需要在消费端实现更复杂的逻辑来间接响应事务状态,可能需要结合业务系统进行额外的设计,比如通过监听数据库状态变化、消息队列的死信队列特性或其他补偿机制来处理未决事务

延迟性事务消息

  • 您需要在创建消息时指定消息的延迟等级,而不是在生产者配置或消息发送后进行延迟
  • RocketMQ支持多种延迟等级,每种等级对应不同的延迟时间
  • 注意,事务消息和延迟消息的直接组合在RocketMQ中并不是直接支持的特性
  • 因为事务消息的设计主要是围绕两阶段提交模型,确保消息发送与本地事务的一致性
  • 而延迟消息侧重于消息的定时投递
  • 然而,可以通过间接的方式结合这两个特性,即在事务消息的本地事务逻辑中包含对延迟操作的处理
  • 下面的示例尝试模拟一种结合方式,但请注意,这仅是一种逻辑上的结合,实际应用中需要根据具体业务场景仔细设计和测试
  • 方案思路:
    • 生产者:发送一个事务消息到特定的主题(例如DelayedTransactionTopic),该消息体中携带了需要进行延迟处理的信息。
    • 事务监听器:在ExecuteLocalTransaction方法中,不直接执行长时间的延迟逻辑,而是执行快速操作(如记录消息待处理状态或存入DB),然后返回primitive.Prepared状态。
    • 检查事务状态:在CheckLocalTransaction方法中,检查事务状态,如果需要,触发一个异步任务或消息队列中的消息,该消息携带延迟处理逻辑和真正的延迟时间。
    • 延迟处理服务:这个服务从队列中取出消息并根据消息中的指示进行真正的延迟操作,例如通过内部队列或定时任务系统(如分布式定时任务框架)来实现延迟执行
  • 相关生产者伪代码未完全实现,仅供参考
    package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
    )type DelayedTxListener struct{}func (d DelayedTxListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {// 假设这里将消息标识为待处理,并记录相关信息到数据库fmt.Println("Preparing transaction, storing message meta...")return primitive.Prepared
    }func (d DelayedTxListener) CheckLocalTransaction(msgExt *primitive.MessageExt) primitive.LocalTransactionState {// 在这里检查消息是否准备好执行延迟操作// 实际操作可能包括从数据库查询该消息的状态// 假设我们已经确定需要进行延迟操作,这里直接模拟提交return primitive.CommitMessageState
    }func main() {mqAddr := "127.0.0.1:9876"p, err := rocketmq.NewTransactionProducer(DelayedTxListener{},producer.WithNameServer([]string{mqAddr}),)if err != nil {panic(err)}err = p.Start()if err != nil {panic(err)}msg := &primitive.Message{Topic: "DelayedTransactionTopic",Body:  []byte("消息内容,可以包含延迟处理的详细信息"),}res, err := p.SendMessageInTransaction(context.Background(), msg)if err != nil {panic(err)}fmt.Printf("发送事务消息状态: %v\n", res.Status)time.Sleep(time.Second * 3600)err = p.Shutdown()if err != nil {panic(err)}
    }
    
  • 注意事项
    • 这个示例主要是概念性的,展示了如何在事务消息的上下文中计划后续的延迟处理步骤,而没有直接实现延迟消息的发送。
    • 实际应用中,您可能需要实现一个额外的后台服务或消息队列来处理这些“计划”好的延迟任务,确保它们能在预定时间得到执行。
    • 事务消息的两阶段提交机制仍然适用,只是延迟操作本身不在RocketMQ的直接事务控制范围内,而是作为一种业务逻辑上的后续处理。
    • 务必根据您的具体业务需求和RocketMQ的版本特性,仔细设计和测试这样的解决方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/26344.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Oracle 19C 数据库表被误删除的模拟恢复

Oracle 19C 数据库表被误删除的模拟恢复操作 1、模拟创建表用于恢复测试 sqlplus zzh/zzh SQL> create table obj_tb tablespace users as select * from dba_objects; Table created. SQL> select count(*) from obj_tb; COUNT(*) ---------- 72373 2、记录当前…

博客摘录「 AXI三种接口及DMA DDR XDMA介绍(应用于vivado中的ip调用)」2024年6月10日

关键要点&#xff1a; 1.AXI Stream经过协议转换可使用AXI_FULL&#xff08;PS与PL间的接口&#xff0c;如GP、HP和ACP&#xff09;。 2.传输数据类里就涉及一个握手协议&#xff0c;即在主从双方数据通信前&#xff0c;有一个握手的过程。基本内容&#xff1a;数据的传输源会…

浅谈配置元件之HTTP请求默认值

浅谈配置元件之HTTP请求默认值 在进行HTTP请求的测试计划设计时&#xff0c;"HTTP请求默认值"配置元件扮演着极其重要的角色&#xff0c;它能够简化测试计划的设置&#xff0c;提高测试效率。本问将详细介绍如何使用JMeter中的“HTTP请求默认值”配置元件。 HTTP请求…

rocketmq-5.1.2的dleger高可用集群部署

1、背景 原先为5.0.0版本&#xff0c;因检查出有漏洞&#xff0c;升级到5.1.2版本。 【Rocketmq是阿里巴巴在2012年开发的分布式消息中间件&#xff0c;专为万亿级超大规模的消息处理而设计&#xff0c;具有高吞吐量、低延迟、海量堆积、顺序收发等特点。在一定条件下&#xf…

天锐绿盾 | 无感知加密软件、透明加密系统、数据防泄漏软件

摘要&#xff1a;文件加密软件,包含禁止非授权的文件泄密和抄袭复制解决方案即使被复制泄密都是自动加密无法阅读,透明加密,反复制软件,内网监控,文件加密,网络安全方案,透明文件加密,加密文件,图纸加密,知识产权保护,加密数据; 通过绿盾信息安全管理软件&#xff0c;系统在不改…

3D线扫相机中的深度数据与激光反射强度数据获取及其应用

1. 引言 3D线扫相机&#xff08;3D line scan camera&#xff09;是一种高精度的三维测量设备&#xff0c;广泛应用于工业自动化、质量控制和精密测量等领域。与传统二维成像相机不同&#xff0c;3D线扫相机能够同时获取物体的深度信息和反射强度信息&#xff0c;从而为高精度…

传统工厂该如何做数字化转型?

传统工厂实现数字化转型需多方面着手&#xff0c;包括树立战略意识、明确目标规划&#xff0c;加强信息化建设、提升数据能力&#xff0c;培养引进人才、推动技术创新&#xff0c;优化业务流程、提高生产效率与质量管控&#xff0c;加强协同合作、实现产业链整合&#xff0c;建…

力扣爆刷第150天之TOP100五连刷(几数之和、堆排、合并链表)

力扣爆刷第150天之TOP100五连刷&#xff08;几数之和、堆排、合并链表&#xff09; 文章目录 力扣爆刷第150天之TOP100五连刷&#xff08;几数之和、堆排、合并链表&#xff09;一、15. 三数之和二、53. 最大子数组和三、912. 排序数组四、21. 合并两个有序链表五、1. 两数之和…

【CH32V305FBP6】USBD HS 中断分析

文章目录 前言中断分析 USBHS_IRQHandler传输完成&#xff1a;USBHS_UIF_TRANSFERTOKEN_IN&#xff1a;发送完成TOKEN_OUT&#xff1a;接收完成 描述符&#xff1a;USBHS_UIF_SETUP_ACT总线复位&#xff1a;USBHS_UIF_BUS_RST总线挂起&#xff1a;USBHS_UIF_SUSPEND 前言 所有…

JavaScript创建函数和对象的常用方法

简介 随着版本的更新&#xff0c;JavaScript中存在大量的创建函数和对象的方法&#xff0c;下面是一些常见的方式以及对应的优缺点&#xff0c;内容参考&#xff1a;ES6 入门教程 创建函数 1. 函数声明&#xff08;Function Declaration&#xff09; function myFunction(a…

np.arctan2和np.arctan

np.arctan2 和 np.arctan 都是用于计算反正切函数的 NumPy 函数&#xff0c;但它们的使用和功能有所不同。 np.arctan2 np.arctan2(y, x) 计算 atan2(y,x)&#xff0c;即从坐标 (x,y)到原点的角度&#xff08;弧度&#xff09;。它考虑了两个参数的符号来确定正确的象限&…

神经网络 torch.nn---nn.RNN()

torch.nn - PyTorch中文文档 (pytorch-cn.readthedocs.io) RNN — PyTorch 2.3 documentation torch.nn---nn.RNN() nn.RNN(input_sizeinput_x,hidden_sizehidden_num,num_layers1,nonlinearitytanh, #默认tanhbiasTrue, #默认是Truebatch_firstFalse,dropout0,bidirection…

Android开发之音乐播放器添加排行需求

Music统计功能需求 1.记录歌曲名称与次数(歌曲播放结束算一次)&#xff0c;根据播放次数制作一个排行列表;&#xff08;开始说要记录歌手&#xff0c;后面debug发现这个字段没有&#xff0c;暂时不记录&#xff09; 2.记录播放歌曲的时长&#xff0c;时间累加&#xff1b;&…

sourcemap

sourcemap介绍 什么是sourceMap sourcemap是为了解决开发代码与实际运行代码不一致时帮助我们debug到原始开发代码的技术webpack通过配置可以自动给我们source maps文件&#xff0c;map文件是一种对应编译文件和源文件的方法 类型含义source-map原始代码 最好的sourcemap质量…

element table 点击某一行中按钮加载

在Element UI中&#xff0c;实现表格&#xff08;element-table&#xff09;中的这种功能通常涉及到数据处理和状态管理。当你点击某一行的按钮时&#xff0c;其他行的按钮需要动态地切换为加载状态&#xff0c;这可以通过以下步骤实现&#xff1a; 1.表格组件&#xff1a;使用…

一文读懂Web Codecs API:浏览器背后的媒体魔术师

引言 ​在早期的Web 网页中&#xff0c;视频播放通常要依靠 Flash 和 Silverlight 等插件来完成&#xff0c;浏览器是不支持直接播放视频的。 随着网络技术的发展&#xff0c;视频这种媒体方式的需求变得普遍&#xff0c;HTML5中&#xff0c;出现了一个新的元素Video&#xf…

【全开源】旅行吧旅游门票预订系统源码(FastAdmin+ThinkPHP+Uniapp)

&#x1f30d;旅游门票预订系统&#xff1a;畅游世界&#xff0c;一键预订 一款基于FastAdminThinkPHPUniapp开发的旅游门票预订系统&#xff0c;支持景点门票、导游产品便捷预订、美食打卡、景点分享、旅游笔记分享等综合系统&#xff0c;提供前后台无加密源码&#xff0c;支…

RabbitMQ延迟消息(通过死信交换机实现)

延迟消息&#xff1a;生产者发送消息时指定一个时间&#xff0c;消费者不会立刻收到消息&#xff0c;而是在指定时间后才收到消息 通过DLX和TTL模拟出延迟队列的功能&#xff0c;即&#xff0c;消息发送以后&#xff0c;不让消费者拿到&#xff0c;而是等待过期时间&#xff0…

山东大学软件学院多核平台上的并行计算期末回忆版

&#xff08;2021级&#xff0c;大数据专业&#xff0c;老师是lwg和yzk&#xff0c;考题全是考前老师说的原题&#xff0c;毫无变化&#xff0c;最终期末分还是看实验情况多一些&#xff0c;但是老师到底是怎么比较的大家的实验性能&#xff0c;让我很头大&#xff0c;晕~&…

linux驱动学习(十三)之锁

需要板子一起学习的可以这里购买&#xff08;含资料&#xff09;&#xff1a;点击跳转 一、锁的作用 1、同步和互斥 1)同步:同一件事情的依次处理&#xff0c;数据的接收---> 数据的处理 --->数据的发送 2)互斥---- 防止对临界资源的竞争&#xff0c;在一个时刻&#…