需求背景:
现在有内容中心(content-center)和 用户中心(user-center)2个微服务,请求内容中心,发送消息给用户中心,完成为指定用户添加积分操作。
文章目录
- 一、准备工作
- 1. 版本对照
- 2. 下载启动RocketMQ
- 3. 引入maven依赖
- 二、内容中心(服务端)
- 2.1. 表结构设计
- 2.2. 配置MQ信息
- 2.3. 控制层
- 2.4. service层
- 2.5. RocketMQ 事务消息监听
- 三、用户中心(客户端)
- 3.1. 依赖
- 3.2.配置
- 3.3. 消息监听
- 开源项目:
一、准备工作
1. 版本对照
RocketMQ 版本 | RocketMQ控制台版本 | RocketMQ starter版本 |
---|---|---|
RocketMQ 4.8.0 | 支持RocketMQ 4.8.0 | 2.2.0 |
2. 下载启动RocketMQ
linux 环境 RocketMQ 4.8.0 安装、部署控制台
https://blog.csdn.net/weixin_40816738/article/details/116269833
windows下RocketMQ下载、安装、部署、控制台
https://blog.csdn.net/weixin_40816738/article/details/115734482
3. 引入maven依赖
<!--集成rocketmq--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency>
二、内容中心(服务端)
消息发送端代码编写
2.1. 表结构设计
share分享表和rocketmq_transaction_logRocketMQ事务日志表2张表,
share
CREATE TABLE IF NOT EXISTS `share` (`id` INT NOT NULL AUTO_INCREMENT COMMENT 'id',`user_id` INT NOT NULL DEFAULT 0 COMMENT '发布人id',`title` VARCHAR(80) NOT NULL DEFAULT '' COMMENT '标题',`create_time` DATETIME NOT NULL COMMENT '创建时间',`update_time` DATETIME NOT NULL COMMENT '修改时间',`is_original` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否原创 0:否 1:是',`author` VARCHAR(45) NOT NULL DEFAULT '' COMMENT '作者',`cover` VARCHAR(256) NOT NULL DEFAULT '' COMMENT '封面',`summary` VARCHAR(256) NOT NULL DEFAULT '' COMMENT '概要信息',`price` INT NOT NULL DEFAULT 0 COMMENT '价格(需要的积分)',`download_url` VARCHAR(256) NOT NULL DEFAULT '' COMMENT '下载地址',`buy_count` INT NOT NULL DEFAULT 0 COMMENT '下载数 ',`show_flag` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否显示 0:否 1:是',`audit_status` VARCHAR(10) NOT NULL DEFAULT 0 COMMENT '审核状态 NOT_YET: 待审核 PASSED:审核通过 REJECTED:审核不通过',`reason` VARCHAR(200) NOT NULL DEFAULT '' COMMENT '审核不通过原因',PRIMARY KEY (`id`))
ENGINE = InnoDB
COMMENT = '分享表';
rocketmq_transaction_logRocketMQ
-- -----------------------------------------------------
-- Table `rocketmq_transaction_log`
-- -----------------------------------------------------
create table rocketmq_transaction_log
(id int auto_increment comment 'id'primary key,transaction_Id varchar(45) not null comment '事务id',log varchar(45) not null comment '日志'
)comment 'RocketMQ事务日志表';
具体详情:见项目源码
2.2. 配置MQ信息
- 项目内部yml配置
server:port: 8003
spring:application:# 应用名称name: ly-rockketmqprofiles:# 环境配置active: devcloud:nacos:discovery:# 服务注册地址server-addr: nacos.server.com:8848config:# 配置中心地址server-addr: nacos.server.com:8848# 配置文件格式file-extension: yml# 共享配置shared-configs:- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
- nacos服务端配置
# MQ name-server地址
rocketmq:name-server: 127.0.0.1:9876producer:#必须指定groupgroup: test-group
2.3. 控制层
package com.gblfy.lyrocketmq.controller;import com.gblfy.common.dto.ShareAuditDTO;
import com.gblfy.lyrocketmq.entity.Share;
import com.gblfy.lyrocketmq.service.ShareService;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;@RestController
@RequestMapping("/admin/shares")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareAdminController {private final ShareService shareService;@PutMapping("/audit/{id}")public Share auditById(@PathVariable Integer id, @RequestBody ShareAuditDTO auditDTO) {//TODO 认证授权return this.shareService.auditById(id, auditDTO);}
}
2.4. service层
package com.gblfy.lyrocketmq.service;import com.gblfy.api.RemoteProductService;
import com.gblfy.common.dto.ShareAuditDTO;
import com.gblfy.common.dto.ShareDTO;
import com.gblfy.common.dto.UserAddBonusMsgDTO;
import com.gblfy.common.dto.UserDTO;
import com.gblfy.common.enums.AuditStatusEnum;
import com.gblfy.lyrocketmq.entity.RocketmqTransactionLog;
import com.gblfy.lyrocketmq.entity.Share;
import com.gblfy.lyrocketmq.mapper.RocketmqTransactionLogMapper;
import com.gblfy.lyrocketmq.mapper.ShareMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import java.util.Objects;
import java.util.UUID;@Slf4j
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareService {private final ShareMapper shareMapper;private final RemoteProductService userCenterFeignClient;private final RocketMQTemplate rocketMQTemplate;private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;public ShareDTO findById(Integer id) {Share share = this.shareMapper.selectByPrimaryKey(id);Integer userId = share.getUserId();UserDTO userDTO = this.userCenterFeignClient.findById(userId);ShareDTO shareDTO = new ShareDTO();BeanUtils.copyProperties(share, shareDTO);//设置发布人shareDTO.setWxNickname(userDTO.getWxNickname());return shareDTO;}public Share auditById(Integer id, ShareAuditDTO auditDTO) {// 1. 查询share是否存在,不存在或者当前的audit_status != NOT_YET,那么抛异常Share share = this.shareMapper.selectByPrimaryKey(id);if (share == null) {throw new IllegalArgumentException("参数非法!该分享不存在!");}if (!Objects.equals("NOT_YET", share.getAuditStatus())) {throw new IllegalArgumentException("参数非法!该分享已审核通过或审核不通过!");}//----------------------------------------发送半消息----------------------------------------// 3. 如果是PASS,那么发送消息给rocketmq,让用户中心去消费,并为发布人添加积分if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {//消息idString transactionId = UUID.randomUUID().toString();this.rocketMQTemplate.sendMessageInTransaction("tx-add-bonus-group",MessageBuilder.withPayload(UserAddBonusMsgDTO.builder().userId(share.getUserId()).bonus(50).build()// Header有妙用).setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId).setHeader("share_id", id).build(),//arg有大用处auditDTO);} else {this.auditByIdInDB(id, auditDTO);}return share;}/*** 审批** @param id* @param auditDTO*/public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {Share share = Share.builder().id(id).auditStatus(auditDTO.getAuditStatusEnum().toString()).reason(auditDTO.getReason()).build();this.shareMapper.updateByPrimaryKeySelective(share);}@Transactional(rollbackFor = Exception.class)public void auditByIdWithRoketMqlog(Integer id, ShareAuditDTO auditDTO, String transactionId) {this.auditByIdInDB(id, auditDTO);this.rocketmqTransactionLogMapper.insertSelective(RocketmqTransactionLog.builder().transactionId(transactionId).log("审核分享..").build());}
}
2.5. RocketMQ 事务消息监听
package com.gblfy.lyrocketmq.listener;import com.gblfy.common.dto.ShareAuditDTO;
import com.gblfy.lyrocketmq.entity.RocketmqTransactionLog;
import com.gblfy.lyrocketmq.mapper.RocketmqTransactionLogMapper;
import com.gblfy.lyrocketmq.service.ShareService;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {private final ShareService shareService;private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;/*** 执行本地事务** @param msg 消息header信息* @param arg 消息体* @return*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {MessageHeaders headers = msg.getHeaders();String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);Integer share_id = Integer.valueOf((String) headers.get("share_id"));try {this.shareService.auditByIdWithRoketMqlog(share_id, (ShareAuditDTO) arg, transactionId);return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {e.printStackTrace();return RocketMQLocalTransactionState.ROLLBACK;}}/*** 本地事务的检查,检查本地事务是否成功** @param msg* @return*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {MessageHeaders headers = msg.getHeaders();String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);RocketmqTransactionLog rocketmqTransactionLog = this.rocketmqTransactionLogMapper.selectOne(RocketmqTransactionLog.builder().transactionId(transactionId).build());if (rocketmqTransactionLog != null) {return RocketMQLocalTransactionState.COMMIT;}return RocketMQLocalTransactionState.ROLLBACK;}
}
详细见源码:本文底部
三、用户中心(客户端)
消息消费端代码编写
3.1. 依赖
<!--集成rocketmq--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency>
3.2.配置
- 项目内部yml配置
server:port: 9000
spring:application:# 应用名称name: ly-productprofiles:# 环境配置active: devcloud:nacos:discovery:# 服务注册地址server-addr: nacos.server.com:8848config:# 配置中心地址server-addr: nacos.server.com:8848# 配置文件格式file-extension: yml# 共享配置shared-configs:- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
- nacos服务端配置
rocketmq:name-server: 127.0.0.1:9876
3.3. 消息监听
package com.gblfy.product.listenner;import com.gblfy.common.dto.UserAddBonusMsgDTO;
import com.gblfy.product.entity.BonusEventLog;
import com.gblfy.product.entity.User;
import com.gblfy.product.mapper.BonusEventLogMapper;
import com.gblfy.product.mapper.UserMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Slf4j
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@RocketMQMessageListener(topic = "tx-add-bonus-group", consumerGroup = "consumer-group")
public class AddBonusListener implements RocketMQListener<UserAddBonusMsgDTO> {private final UserMapper userMapper;private final BonusEventLogMapper bonusEventLogMapper;@Overridepublic void onMessage(UserAddBonusMsgDTO message) {// 1. 为用户添加积分Integer userId = message.getUserId();Integer bonus = message.getBonus();User user = this.userMapper.selectByPrimaryKey(userId);user.setBonus(user.getBonus() + bonus);this.userMapper.updateByPrimaryKeySelective(user);// 2.记录日志到bonus_event_log表中this.bonusEventLogMapper.insert(BonusEventLog.builder().userId(userId).value(bonus).event("CONTRIBUTE").createTime(new Date()).description("投稿加积分...").build());log.info("积分添加完毕...");}
}
开源项目:
https://gitee.com/gb_90/micro-service-parent