SpringCloudStream+Rocket事务消息配置

本文用到的版本

spring-cloud-stream 3.2.6

rocketmq-client 4.9.4

spring-cloud-starter-stream-rocketmq 2021.0.5.0

一、依赖导入

     <dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId></dependency>

这个版本不再需要引入rocketmq-spring-boot-starter这个依赖

这个版本的stream不支持@EnableBinding注解,这个版本的rocketmq不支持txProducerGroup参数。

二、编写生产者

1.写配置

application.yml增加如下配置

spring:cloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876bindings:addBounsChannel-out-0:producer:producerType: TranstransactionListener: addBounsStreamTransactionListenerbindings:## 新版本固定格式  channel名字-{out/in}-{index}addBounsChannel-out-0:destination: add-bounsgroup: bouns-producer-group

这里事务的配置参考官方文档:https://github.com/alibaba/spring-cloud-alibaba/blob/rocketmq/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq-new.adoc

注意:bingdings下面的channel只能有一个in和一个out,不能配置多个in 多个out,否则会引起配置混乱。

注意:事务配置新旧版本有变化

旧版为

spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true

新版为

spring.cloud.stream.rocketmq.bindings.output2.producer.producerType=Trans

如果不确定版本,可以直接查看下面这个类的属性。com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties

2.写代码

发送代码

    @Autowiredprivate StreamBridge streamBridge;String transactionId = UUID.randomUUID().toString();streamBridge.send("addBounsChannel-out-0",MessageBuilder.withPayload(UserAddBonusMsgDTO.builder().userId(share.getUserId()).bonus(50).build()).setHeader("TRANSACTION_ID", transactionId).setHeader("share_id", id).setHeader("dto", JSON.toJSONString(auditDTO)).build()

send的第一个参数与yml里的channel名保持一致

事务代码

package com.itmuch.contentcenter.rocketmq;import com.alibaba.fastjson.JSON;
import com.itmuch.contentcenter.dao.content.RocketmqTransactionLogMapper;
import com.itmuch.contentcenter.domain.dto.content.ShareAuditDTO;
import com.itmuch.contentcenter.domain.entity.content.RocketmqTransactionLog;
import com.itmuch.contentcenter.service.content.ShareService;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Map;@Component
public class AddBounsStreamTransactionListener implements TransactionListener {@Autowiredprivate ShareService shareService;@Resourceprivate RocketmqTransactionLogMapper rocketmqTransactionLogMapper;@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {Map<String, String> headers = msg.getProperties();String transactionId = (String) headers.get("TRANSACTION_ID");Integer shareId = Integer.valueOf((String) headers.get("share_id"));ShareAuditDTO auditDTO = JSON.parseObject(headers.get("dto"), ShareAuditDTO.class);try {shareService.auditByIdInDBWithRocketMqLog(shareId, auditDTO, transactionId);return LocalTransactionState.COMMIT_MESSAGE;} catch (Exception e) {return LocalTransactionState.ROLLBACK_MESSAGE;}}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {Map<String, String> headers = msg.getProperties();String transactionId = (String) headers.get("TRANSACTION_ID");RocketmqTransactionLog rocketmqTransactionLog = rocketmqTransactionLogMapper.selectOne(RocketmqTransactionLog.builder().transactionId(transactionId).build());if (rocketmqTransactionLog != null) {return LocalTransactionState.COMMIT_MESSAGE;}return LocalTransactionState.ROLLBACK_MESSAGE;}
}

这里的事务代码类似非stream方式 实现RocketMQLocalTransactionListener里的两个方法。

不同点为:

获取header,非stream方式为 调用getHeaders()方法,stream方式为调用getProperties()方法。

RocketMQHeaders.TRANSACTION_ID这个常量在stream方式里没有了,使用字符串"TRANSACTION_ID"替换就行。

三、编写消费者

1.写配置

application.yml

spring:cloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876bindings:## 新版本固定格式  channel名字-{out/in}-{index}addBounsChannel-in-0:destination: add-bounsgroup: bouns-consumer-group

注意:bingdings下面的channel只能有一个in和一个out,不能配置多个in 多个out,否则会引起配置混乱。

2.写代码

消费者代码

package com.itmuch.usercenter.rocketmq;import com.itmuch.usercenter.domain.dto.message.UserAddBonusMsgDTO;
import com.itmuch.usercenter.service.user.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.function.Consumer;@Slf4j
@Configuration
public class AddBounsStreamConsumer {@Autowiredprivate UserService userService;@Beanpublic Consumer<UserAddBonusMsgDTO> addBounsChannel() {return message -> {log.info("addBounsChannel接到消息:{}", message);userService.addBonus(message);};}
}

注意:@Bean注解的方法名和yml里的channel名前半段保持一致

引用的 userService.addBonus

package com.itmuch.usercenter.service.user;import com.itmuch.usercenter.dao.user.BonusEventLogMapper;
import com.itmuch.usercenter.dao.user.UserMapper;
import com.itmuch.usercenter.domain.dto.message.UserAddBonusMsgDTO;
import com.itmuch.usercenter.domain.entity.user.BonusEventLog;
import com.itmuch.usercenter.domain.entity.user.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;
import java.util.Date;@Service
@Slf4j
public class UserService {@Resourceprivate UserMapper userMapper;@Resourceprivate BonusEventLogMapper bonusEventLogMapper;@Transactional(rollbackFor = Exception.class)public void addBonus(UserAddBonusMsgDTO message) {log.info("消费消息 message ={}",message);//当收到消息的时候,执行的业务//1.为用户加积分Integer userId = message.getUserId();User user = userMapper.selectByPrimaryKey(userId);Integer bonus = message.getBonus();user.setBouns(user.getBouns() + bonus);userMapper.updateByPrimaryKeySelective(user);//2.记录日志到bounus_event_log表里面bonusEventLogMapper.insert(BonusEventLog.builder().userId(userId).value(bonus).event("CONTRIBUTE").createTime(new Date()).description("投稿加积分..").build());log.info("积分添加完毕..");}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/88455.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring Cloud--@RefreshScope动态刷新的原理

原文网址&#xff1a;Spring Cloud--RefreshScope动态刷新的原理_IT利刃出鞘的博客-CSDN博客 简介 本文介绍Spring Cloud的RefreshScope动态刷新的原理。 原理概述 Spring的作用域有&#xff1a;single&#xff08;单例&#xff09;、prototype&#xff08;多例&#xff09…

python+nodejs+php+springboot+vue 导师双选系统

为了直观显示系统的功能&#xff0c;运用用例图这样的工具显示分析的结果。分析的导师功能如下。导师管理导师选择信息&#xff0c;管理项目&#xff0c;管理项目提交并对学员提交的项目进行指导。 为了直观显示系统的功能&#xff0c;运用用例图这样的工具显示分析的结果。分析…

漫谈:C语言 C++ 所有编程语言 =和==的麻烦

这次不只是C语言很麻拐&#xff0c;是所有编程语言都很麻拐了。 赋值和比较是编程语言最基本的操作之二&#xff0c;C和所有类C语言都使用“”和“”来分别表示赋值和比较。 数学上等号“”是个单一的概念&#xff0c;含义是“相等”&#xff0c;左右两边是等价的&#xff0c;很…

神仙打架!谷歌和OpenAI竞相推出多模式AI

原创 | 文 BFT机器人 随着秋季的到来&#xff0c;科技界正在展开另一场季节性活动——科技巨头谷歌和OpenAI正在竞相发布下一代多模态大语言模型&#xff0c;这些高级模型能够解释图像和文本&#xff0c;使他们能够执行诸如从草图生成网站代码或以文本形式描述视觉图表等任务。…

秦时明月沧海手游阵容推荐,秦时明月沧海角色强度

秦时明月沧海角色强度如何&#xff1f;在秦时明月沧海手游中&#xff0c;您可以从大量的角色卡牌中选择并发展&#xff0c;为了顺利通过各种副本&#xff0c;玩家们需要精心搭配阵容。那么&#xff0c;具体该如何配置最强的角色呢&#xff1f; 下面&#xff0c;小编将带各位玩家…

TypeScript学习大纲

TypeScript 是 JavaScript 的一个超集&#xff0c;它为 JavaScript 添加了静态类型系统。以下是一些必须了解的 TypeScript 基本知识点和特性&#xff1a; 基本类型: TypeScript 支持与 JavaScript 相同的基本类型&#xff0c;并提供了一些额外的类型选项。 let isDone: boolea…

怎么加密U盘文件?U盘文件加密软件哪个好?

当U盘中储存重要数据时&#xff0c;我们需要保护U盘文件安全&#xff0c;避免数据泄露。那么&#xff0c;怎么加密U盘文件呢&#xff1f;U盘文件加密软件哪个好呢&#xff1f; ​U盘数据怎么避免泄露&#xff1f; 想要避免U盘数据泄露&#xff0c;最佳的方法就是对U盘文件进行…

彻底弄懂js函数柯里化

彻底弄懂js函数柯里化 1、前言2、什么是柯里化3、实现原理4、应用场景4.1 参数复用4.2 遍历数组 1、前言 函数柯里化(Currying)在JavaScript中总感觉属于一种不温不火的存在&#xff0c;甚至有些开发者在提起柯里化时&#xff0c;竟然会有点生疏不懂。其实不然&#xff0c;对于…

关于埋点上报

一、埋点上报结构包含哪些&#xff1f; 埋点上报结构一般包含以下信息&#xff1a; 事件名称&#xff1a;标识上报的是哪个事件&#xff0c;例如“注册成功”或“点击按钮”等。事件发生时间&#xff1a;记录事件发生的时间戳。用户ID&#xff1a;标识事件所属的用户。设备信息…

Vue实现Hello World

<div id"aa"> <p>{{h}}</p> </div> <script src"https://cdn.jsdelivr.net/npm/vue2/dist/vue.js"></script> <script> const hello new Vue({ el:#aa, data:{ h : Hello World } }) </script>

Konva基本处理流程和相关架构设计

前言 canvas是使用JavaScript基于上下文对象进行2D图形的绘制的HTML元素&#xff0c;通常用于动画、游戏画面、数据可视化、图片编辑以及实时视频处理等方面。基于Canvas之上&#xff0c;诞生了例如 PIXI、ZRender、Fabric、Konva等 Canvas渲染引擎&#xff0c;兼顾易用的同时…

TCP协议与UDP协议

TCP&#xff08;传输控制协议&#xff09;和UDP&#xff08;用户数据报协议&#xff09;是两种常见的互联网传输协议&#xff0c;它们在数据传输方面有一些重要的区别&#xff1a; 连接性&#xff1a;TCP是面向连接的协议&#xff0c;而UDP是无连接的协议。这意味着在使用TCP进…

基于微信小程序的电影院订票系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言运行环境说明用户微信小程序端的主要功能有&#xff1a;管理员的主要功能有&#xff1a;具体实现截图详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09;有保障的售后福利 代码参考论文参考源码获取 前言 &#x1f497;博主介绍&…

ASCII码-对照表

ASCII 1> ASCII 控制字符2> ASCII 显示字符3> 常用ASCII码3.1> 【CR】\r 回车符3.2> 【LF】\n 换行符3.3> 不同操作系统&#xff0c;文件中换行 1> ASCII 控制字符 2> ASCII 显示字符 ASCII&#xff08;American Standard Code for Information Interc…

【计算机网络】IP协议

目录 前言 IP协议 基本概念 IP协议格式 分片 16位标识 3位标志与13位片偏移 分片流程 网段划分 网络号和主机号 DHCP协议 CIDR划分方案 特殊的ip地址 ip地址数量限制 私有ip地址与公网ip地址 路由转发 前言 我们前面讲了HTTP/HTTPS协议和TCP/…

ElementUI - 主页面--动态树右侧内容管理

一.左侧动态树 1.定义组件 ①样式&数据处理 <template><el-menu class"el-menu-vertical-demo" background-color"#334157"text-color"#fff" active-text-color"#ffd04b" :collapse"collapsed" router :def…

centos7通过docker搭建nginx+php环境

以下环境都是基于centos7.9完成。 1.安装docker yum install docker-ce 说明&#xff1a;这一步&#xff0c;由于centos软件仓库没有收纳docker&#xff0c;需要自己去官网爬文档安装。 安装完成之后&#xff0c;就是启动docker服务以及添加到开机启动。 systemctl enable do…

相乘(蓝桥杯)

相乘 本题为填空题&#xff0c;只需要算出结果后&#xff0c;在代码中使用输出语句将所填结果输出即可。 小蓝发现&#xff0c;他将 1 至 1000000007 之间的不同的数与 2021 相乘后再求除以 1000000007 的余数&#xff0c;会得到不同的数。 小蓝想知道&#xff0c;能不能在 1 …

WordPress主题开发( 七)之—— 模版文件继承规则

WordPress主题开发&#xff08; 七&#xff09;之—— 模版文件继承规则 概述模板文件层次结构示例可视化概述层次结构详细信息主页显示首页显示单文章页面单页分类目录标签自定义分类自定义文章类型作者显示日期搜索结果404&#xff08;未找到&#xff09;附件嵌入功能非ASCII…

Spring Cloud Alibaba快速整合OpenFeign

文章目录 spring cloud alibaba 整合OpenFeign整合流程1.导入依赖2. 编写调用接口2.1 service&#xff08;这里写的是clients&#xff09;2.2 controller 3.设置其最大链接时间3.1 配置文件3.2 client3.3 接口3.4 被访问的controller spring cloud alibaba 整合OpenFeign Fore…