SpringBoot2.x Nacos RocketMQ 事务消息

需求背景:
现在有内容中心(content-center)和 用户中心(user-center)2个微服务,请求内容中心,发送消息给用户中心,完成为指定用户添加积分操作。

在这里插入图片描述

文章目录

          • 一、准备工作
            • 1. 版本对照
            • 2. 下载启动RocketMQ
            • 3. 引入maven依赖
          • 二、内容中心(服务端)
            • 2.1. 表结构设计
            • 2.2. 配置MQ信息
            • 2.3. 控制层
            • 2.4. service层
            • 2.5. RocketMQ 事务消息监听
          • 三、用户中心(客户端)
            • 3.1. 依赖
            • 3.2.配置
            • 3.3. 消息监听
            • 开源项目:

一、准备工作
1. 版本对照
RocketMQ 版本RocketMQ控制台版本RocketMQ starter版本
RocketMQ 4.8.0支持RocketMQ 4.8.02.2.0
2. 下载启动RocketMQ

linux 环境 RocketMQ 4.8.0 安装、部署控制台
https://blog.csdn.net/weixin_40816738/article/details/116269833

windows下RocketMQ下载、安装、部署、控制台
https://blog.csdn.net/weixin_40816738/article/details/115734482

3. 引入maven依赖
 <!--集成rocketmq--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency>
二、内容中心(服务端)

消息发送端代码编写

2.1. 表结构设计

share分享表和rocketmq_transaction_logRocketMQ事务日志表2张表,
share

CREATE TABLE IF NOT EXISTS `share` (`id` INT NOT NULL AUTO_INCREMENT COMMENT 'id',`user_id` INT NOT NULL DEFAULT 0 COMMENT '发布人id',`title` VARCHAR(80) NOT NULL DEFAULT '' COMMENT '标题',`create_time` DATETIME NOT NULL COMMENT '创建时间',`update_time` DATETIME NOT NULL COMMENT '修改时间',`is_original` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否原创 0:否 1:是',`author` VARCHAR(45) NOT NULL DEFAULT '' COMMENT '作者',`cover` VARCHAR(256) NOT NULL DEFAULT '' COMMENT '封面',`summary` VARCHAR(256) NOT NULL DEFAULT '' COMMENT '概要信息',`price` INT NOT NULL DEFAULT 0 COMMENT '价格(需要的积分)',`download_url` VARCHAR(256) NOT NULL DEFAULT '' COMMENT '下载地址',`buy_count` INT NOT NULL DEFAULT 0 COMMENT '下载数 ',`show_flag` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否显示 0:否 1:是',`audit_status` VARCHAR(10) NOT NULL DEFAULT 0 COMMENT '审核状态 NOT_YET: 待审核 PASSED:审核通过 REJECTED:审核不通过',`reason` VARCHAR(200) NOT NULL DEFAULT '' COMMENT '审核不通过原因',PRIMARY KEY (`id`))
ENGINE = InnoDB
COMMENT = '分享表';

rocketmq_transaction_logRocketMQ

-- -----------------------------------------------------
-- Table `rocketmq_transaction_log`
-- -----------------------------------------------------
create table rocketmq_transaction_log
(id             int auto_increment comment 'id'primary key,transaction_Id varchar(45) not null comment '事务id',log            varchar(45) not null comment '日志'
)comment 'RocketMQ事务日志表';

具体详情:见项目源码

2.2. 配置MQ信息
  • 项目内部yml配置
server:port: 8003
spring:application:# 应用名称name: ly-rockketmqprofiles:# 环境配置active: devcloud:nacos:discovery:# 服务注册地址server-addr: nacos.server.com:8848config:# 配置中心地址server-addr: nacos.server.com:8848# 配置文件格式file-extension: yml# 共享配置shared-configs:- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
  • nacos服务端配置
# MQ name-server地址
rocketmq:name-server: 127.0.0.1:9876producer:#必须指定groupgroup: test-group
2.3. 控制层
package com.gblfy.lyrocketmq.controller;import com.gblfy.common.dto.ShareAuditDTO;
import com.gblfy.lyrocketmq.entity.Share;
import com.gblfy.lyrocketmq.service.ShareService;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;@RestController
@RequestMapping("/admin/shares")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareAdminController {private final ShareService shareService;@PutMapping("/audit/{id}")public Share auditById(@PathVariable Integer id, @RequestBody ShareAuditDTO auditDTO) {//TODO 认证授权return this.shareService.auditById(id, auditDTO);}
}
2.4. service层
package com.gblfy.lyrocketmq.service;import com.gblfy.api.RemoteProductService;
import com.gblfy.common.dto.ShareAuditDTO;
import com.gblfy.common.dto.ShareDTO;
import com.gblfy.common.dto.UserAddBonusMsgDTO;
import com.gblfy.common.dto.UserDTO;
import com.gblfy.common.enums.AuditStatusEnum;
import com.gblfy.lyrocketmq.entity.RocketmqTransactionLog;
import com.gblfy.lyrocketmq.entity.Share;
import com.gblfy.lyrocketmq.mapper.RocketmqTransactionLogMapper;
import com.gblfy.lyrocketmq.mapper.ShareMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import java.util.Objects;
import java.util.UUID;@Slf4j
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareService {private final ShareMapper shareMapper;private final RemoteProductService userCenterFeignClient;private final RocketMQTemplate rocketMQTemplate;private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;public ShareDTO findById(Integer id) {Share share = this.shareMapper.selectByPrimaryKey(id);Integer userId = share.getUserId();UserDTO userDTO = this.userCenterFeignClient.findById(userId);ShareDTO shareDTO = new ShareDTO();BeanUtils.copyProperties(share, shareDTO);//设置发布人shareDTO.setWxNickname(userDTO.getWxNickname());return shareDTO;}public Share auditById(Integer id, ShareAuditDTO auditDTO) {// 1. 查询share是否存在,不存在或者当前的audit_status != NOT_YET,那么抛异常Share share = this.shareMapper.selectByPrimaryKey(id);if (share == null) {throw new IllegalArgumentException("参数非法!该分享不存在!");}if (!Objects.equals("NOT_YET", share.getAuditStatus())) {throw new IllegalArgumentException("参数非法!该分享已审核通过或审核不通过!");}//----------------------------------------发送半消息----------------------------------------// 3. 如果是PASS,那么发送消息给rocketmq,让用户中心去消费,并为发布人添加积分if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {//消息idString transactionId = UUID.randomUUID().toString();this.rocketMQTemplate.sendMessageInTransaction("tx-add-bonus-group",MessageBuilder.withPayload(UserAddBonusMsgDTO.builder().userId(share.getUserId()).bonus(50).build()// Header有妙用).setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId).setHeader("share_id", id).build(),//arg有大用处auditDTO);} else {this.auditByIdInDB(id, auditDTO);}return share;}/*** 审批** @param id* @param auditDTO*/public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {Share share = Share.builder().id(id).auditStatus(auditDTO.getAuditStatusEnum().toString()).reason(auditDTO.getReason()).build();this.shareMapper.updateByPrimaryKeySelective(share);}@Transactional(rollbackFor = Exception.class)public void auditByIdWithRoketMqlog(Integer id, ShareAuditDTO auditDTO, String transactionId) {this.auditByIdInDB(id, auditDTO);this.rocketmqTransactionLogMapper.insertSelective(RocketmqTransactionLog.builder().transactionId(transactionId).log("审核分享..").build());}
}
2.5. RocketMQ 事务消息监听
package com.gblfy.lyrocketmq.listener;import com.gblfy.common.dto.ShareAuditDTO;
import com.gblfy.lyrocketmq.entity.RocketmqTransactionLog;
import com.gblfy.lyrocketmq.mapper.RocketmqTransactionLogMapper;
import com.gblfy.lyrocketmq.service.ShareService;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {private final ShareService shareService;private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;/*** 执行本地事务** @param msg 消息header信息* @param arg 消息体* @return*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {MessageHeaders headers = msg.getHeaders();String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);Integer share_id = Integer.valueOf((String) headers.get("share_id"));try {this.shareService.auditByIdWithRoketMqlog(share_id, (ShareAuditDTO) arg, transactionId);return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {e.printStackTrace();return RocketMQLocalTransactionState.ROLLBACK;}}/*** 本地事务的检查,检查本地事务是否成功** @param msg* @return*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {MessageHeaders headers = msg.getHeaders();String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);RocketmqTransactionLog rocketmqTransactionLog = this.rocketmqTransactionLogMapper.selectOne(RocketmqTransactionLog.builder().transactionId(transactionId).build());if (rocketmqTransactionLog != null) {return RocketMQLocalTransactionState.COMMIT;}return RocketMQLocalTransactionState.ROLLBACK;}
}

详细见源码:本文底部

三、用户中心(客户端)

消息消费端代码编写

3.1. 依赖
 <!--集成rocketmq--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency>
3.2.配置
  • 项目内部yml配置
server:port: 9000
spring:application:# 应用名称name: ly-productprofiles:# 环境配置active: devcloud:nacos:discovery:# 服务注册地址server-addr: nacos.server.com:8848config:# 配置中心地址server-addr: nacos.server.com:8848# 配置文件格式file-extension: yml# 共享配置shared-configs:- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
  • nacos服务端配置
rocketmq:name-server: 127.0.0.1:9876
3.3. 消息监听
package com.gblfy.product.listenner;import com.gblfy.common.dto.UserAddBonusMsgDTO;
import com.gblfy.product.entity.BonusEventLog;
import com.gblfy.product.entity.User;
import com.gblfy.product.mapper.BonusEventLogMapper;
import com.gblfy.product.mapper.UserMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Slf4j
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@RocketMQMessageListener(topic = "tx-add-bonus-group", consumerGroup = "consumer-group")
public class AddBonusListener implements RocketMQListener<UserAddBonusMsgDTO> {private final UserMapper userMapper;private final BonusEventLogMapper bonusEventLogMapper;@Overridepublic void onMessage(UserAddBonusMsgDTO message) {// 1. 为用户添加积分Integer userId = message.getUserId();Integer bonus = message.getBonus();User user = this.userMapper.selectByPrimaryKey(userId);user.setBonus(user.getBonus() + bonus);this.userMapper.updateByPrimaryKeySelective(user);// 2.记录日志到bonus_event_log表中this.bonusEventLogMapper.insert(BonusEventLog.builder().userId(userId).value(bonus).event("CONTRIBUTE").createTime(new Date()).description("投稿加积分...").build());log.info("积分添加完毕...");}
}
开源项目:

https://gitee.com/gb_90/micro-service-parent

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/518314.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

阿里云宣布3年再投2000亿

4月20日&#xff0c;阿里云宣布&#xff1a;未来3年再投2000亿&#xff0c;用于云操作系统、服务器、芯片、网络等重大核心技术研发攻坚和面向未来的数据中心建设。 近期&#xff0c;谷歌、美团等相继关闭或收缩云计算业务。在当前经济形势下&#xff0c;阿里云是否会缩减投入&…

JavaScript-Map和Set

ES6新特性 Map var map new Map([[wang, 23],[ht, 22],[test,[1,2,3,yy]],[3,test num]]) // 通过 key获取 value var test1 map.get(test); var num map.get(3); console.log(test1) console.log(num) // 添加新的 键值对 map.set(admin, 123456) console.log(map) // 修…

MaxCompute技术人背后的故事:从ApacheORC到AliORC

2019大数据技术公开课第一季《技术人生专访》来袭&#xff0c;本季将带领开发者们探讨大数据技术&#xff0c;分享不同国家的工作体验。本文整理自阿里巴巴计算平台事业部高级技术专家吴刚的专访&#xff0c;将为大家介绍Apache ORC开源项目、主流的开源列存格式ORC和Parquet的…

JavaScript-Iterable迭代

Iterable ES6新特性 遍历数组 // for of 打印值 &#xff0c; for in 打印下标 var arr [4,5,6] for (const number of arr) {console.log(number) }遍历Map var map new Map([[whl,100],[ht,110],[other,0]]) for (let x of map) {console.log(x)console.log(x[0])consol…

阿里小程序亮相2019上海云峰会:大生态促成许多“小而美”

7月25日下午&#xff0c;在上海世博中心的阿里云峰会上海站上&#xff0c;阿里巴巴小程序繁星计划以展区加开放式论坛形式&#xff0c;与各领域开发者、企业和生态合作伙伴充分交流了小程序一云多端的规划和进展&#xff0c;以及阿里系各端APP向小程序开放的资源和能力。 与会者…

快速验证业务决策,“玩转”用户增长

背景 闲鱼目前已经是国内最大的闲置物品交易平台&#xff0c;每天都有数以千万计的用户过来闲鱼&#xff0c;以C2C交易为主。在闲鱼里面&#xff0c;用户的C2C购物频率其实是很低的&#xff0c;而纯粹地逛商品feed流是一件挺无聊的事情。在业务上做加法&#xff0c;突破闲鱼用…

JavaScript-函数

函数 定义函数 定义方式一 绝对值函数 function abs(x) {if (x>0){return x;}else{return -x;} }一旦执行到return 代表函数结束&#xff0c;返回结果&#xff01; 如果没有执行return&#xff0c;函数执行完也会返回结果&#xff0c;结果就是NaN / undefined 定义方式二…

领航智变时代 2020 NAVIGATE领航者峰会云上起航

4月20日&#xff0c;由紫光集团和旗下新华三集团主办的2020 NAVIGATE领航者峰会首次全面移师线上&#xff0c;盛大启航。本次线上峰会从4月20日到25日持续6天&#xff0c;以“智变”为主题&#xff0c;通过33个专题&#xff0c;超过120场演讲&#xff0c;聚焦探索智能时代的智与…

在阿里,我如何做好技术项目管理?

阿里妹导读&#xff1a;在技术公司、尤其是互联网公司&#xff0c;技术人员作为PM(项目经理)是非常常见的。有些同学得心应手&#xff0c;有条不紊&#xff0c;能得到清晰稳定的预期结果&#xff1b;有些同学则在过程中遇到各种闹心的事&#xff0c;最后不是项目上不了线&#…

云原生化的迁云实战

云原生的时代已经到来&#xff0c;云原生技术正在重塑整个软件生命周期&#xff0c;阿里巴巴是国内最早布局云原生技术的公司之一。 容器服务团队在过去的几年时间内帮助很多用户成功把业务云原生化并迁移上云&#xff0c;其中有现在已经是我们TOP10的大客户&#xff0c;也有需…

超详细!一文告诉你 SparkStreaming 如何整合 Kafka !附代码可实践

来源 | Alice菌责编 | Carol封图 | CSDN 下载于视觉中国相信很多小伙伴已经接触过 SparkStreaming 了&#xff0c;理论就不讲太多了&#xff0c;今天的内容主要是为大家带来的是 SparkStreaming 整合 Kafka 的教程。文中含代码&#xff0c;感兴趣的朋友可以复制动手试试&#…

PerfDog-移动端性能测试-基本使用

常见的腾讯性能测试工具&#xff1a;腾讯gt、腾讯wetest、腾讯perfdog 腾讯perfdog&#xff1a; https://perfdog.qq.com/ 一、介绍&#xff1a; 移动全平台iOS/Android性能测试、分析工具平台。快速定位分析性能问题&#xff0c;提升APP应用及游戏性能和品质。手机无需ROOT/越…

网易云音乐的消息队列改造之路

十年文案老司机&#xff0c;不如网易评论区。 网易云音乐自2013年上线后&#xff0c;业务保持了高速增长。云音乐除了提供好听的音乐外&#xff0c;还留下了我们在乐和人上的美好回忆。本文整理自网易云音乐消息队列负责人林德智在近期 Apache Flink&RocketMQ Meetup 上海…

OSS在线迁移服务剖析

在前迁移说明 目前由于 OSS 数据迁移服务涉及到对目标的 OSS 要有很多 action 的 API 授权&#xff0c;为避免用户产生过多的学习成本&#xff0c;我们直接强制使用主账号进行迁移&#xff1b;该服务正在公测中&#xff0c;目前仍在免费使用阶段&#xff1b;服务使用需要提前工…

JavaScript-变量的作用域 、const、let

作用域 局部函数 在javascript中&#xff0c;var定义变量实际是有作用域的。 假设在函数体中声明&#xff0c;则在函数体外不可以使用~&#xff08;如果非要使用的话&#xff0c;可以用闭包&#xff09; function qj() {var x 1;x x 1; } x x 2; // Uncaught Referenc…

idea 开启Run DashBoard

文章目录1.项目的.idea文件夹下&#xff0c;打开workspace.xml文件2. 添加 RunDashboard 节点&#xff1a;IDEA中&#xff0c;run dashboard是一个直观、方便好用的面板 1.项目的.idea文件夹下&#xff0c;打开workspace.xml文件 2. 添加 RunDashboard 节点&#xff1a; <co…

那些你不知道的 LVS 秘密!

作者 | 故事凌责编 | 郭芮近来在群里,看到大家说对lvskeepalived不太了解&#xff0c;我想我应该是有发言权的。自己本身就是运维出身&#xff0c;原来在京东物流的时候&#xff0c;lvskeepalived就是仓库物流在用的&#xff0c;踩了很多坑&#xff0c;只不过后来都上云了&…

从零到破万节点!支撑618大促背后的蚂蚁金服Kubernetes集群

2019年天猫618大促&#xff0c;蚂蚁金服首次在大促中对调度系统和技术栈全面应用Kubernetes&#xff0c;突破了Kubernetes单集群万节点的规模&#xff0c;总节点数达到数十万个&#xff0c;这是世界最大规模的 Kubernetes 集群之一&#xff0c;而这距离开发团队下载Kubernetes代…

MongoDB 定位 oplog 必须全表扫描吗?

MongoDB oplog &#xff08;类似于 MySQL binlog&#xff09; 记录数据库的所有修改操作&#xff0c;除了用于主备同步&#xff1b;oplog 还能玩出很多花样&#xff0c;比如 全量备份 增量备份所有的 oplog&#xff0c;就能实现 MongoDB 恢复到任意时间点的功能通过 oplog&am…

JavaScript-方法

方法的定义 方法就是把函数放在对象里面 var wang {name: 网络,birth: 2020,// 方法age: function () {// 今年 - 出生的年var now_year new Date().getFullYear();return now_year-this.birth} } // 属性 wang.name // 方法&#xff0c;一定要带() kuangshen.age()拆开上面…