本文用到的版本
spring-cloud-stream 3.2.6
rocketmq-client 4.9.4
spring-cloud-starter-stream-rocketmq 2021.0.5.0
一、依赖导入
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId></dependency>
这个版本不再需要引入rocketmq-spring-boot-starter这个依赖
这个版本的stream不支持@EnableBinding注解,这个版本的rocketmq不支持txProducerGroup参数。
二、编写生产者
1.写配置
application.yml增加如下配置
spring:cloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876bindings:addBounsChannel-out-0:producer:producerType: TranstransactionListener: addBounsStreamTransactionListenerbindings:## 新版本固定格式 channel名字-{out/in}-{index}addBounsChannel-out-0:destination: add-bounsgroup: bouns-producer-group
这里事务的配置参考官方文档:https://github.com/alibaba/spring-cloud-alibaba/blob/rocketmq/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq-new.adoc
注意:bingdings下面的channel只能有一个in和一个out,不能配置多个in 多个out,否则会引起配置混乱。
注意:事务配置新旧版本有变化
旧版为
spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true
新版为
spring.cloud.stream.rocketmq.bindings.output2.producer.producerType=Trans
如果不确定版本,可以直接查看下面这个类的属性。com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties
2.写代码
发送代码
@Autowiredprivate StreamBridge streamBridge;String transactionId = UUID.randomUUID().toString();streamBridge.send("addBounsChannel-out-0",MessageBuilder.withPayload(UserAddBonusMsgDTO.builder().userId(share.getUserId()).bonus(50).build()).setHeader("TRANSACTION_ID", transactionId).setHeader("share_id", id).setHeader("dto", JSON.toJSONString(auditDTO)).build()
send的第一个参数与yml里的channel名保持一致
事务代码
package com.itmuch.contentcenter.rocketmq;import com.alibaba.fastjson.JSON;
import com.itmuch.contentcenter.dao.content.RocketmqTransactionLogMapper;
import com.itmuch.contentcenter.domain.dto.content.ShareAuditDTO;
import com.itmuch.contentcenter.domain.entity.content.RocketmqTransactionLog;
import com.itmuch.contentcenter.service.content.ShareService;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Map;@Component
public class AddBounsStreamTransactionListener implements TransactionListener {@Autowiredprivate ShareService shareService;@Resourceprivate RocketmqTransactionLogMapper rocketmqTransactionLogMapper;@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {Map<String, String> headers = msg.getProperties();String transactionId = (String) headers.get("TRANSACTION_ID");Integer shareId = Integer.valueOf((String) headers.get("share_id"));ShareAuditDTO auditDTO = JSON.parseObject(headers.get("dto"), ShareAuditDTO.class);try {shareService.auditByIdInDBWithRocketMqLog(shareId, auditDTO, transactionId);return LocalTransactionState.COMMIT_MESSAGE;} catch (Exception e) {return LocalTransactionState.ROLLBACK_MESSAGE;}}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {Map<String, String> headers = msg.getProperties();String transactionId = (String) headers.get("TRANSACTION_ID");RocketmqTransactionLog rocketmqTransactionLog = rocketmqTransactionLogMapper.selectOne(RocketmqTransactionLog.builder().transactionId(transactionId).build());if (rocketmqTransactionLog != null) {return LocalTransactionState.COMMIT_MESSAGE;}return LocalTransactionState.ROLLBACK_MESSAGE;}
}
这里的事务代码类似非stream方式 实现RocketMQLocalTransactionListener里的两个方法。
不同点为:
获取header,非stream方式为 调用getHeaders()方法,stream方式为调用getProperties()方法。
RocketMQHeaders.TRANSACTION_ID这个常量在stream方式里没有了,使用字符串"TRANSACTION_ID"替换就行。
三、编写消费者
1.写配置
application.yml
spring:cloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876bindings:## 新版本固定格式 channel名字-{out/in}-{index}addBounsChannel-in-0:destination: add-bounsgroup: bouns-consumer-group
注意:bingdings下面的channel只能有一个in和一个out,不能配置多个in 多个out,否则会引起配置混乱。
2.写代码
消费者代码
package com.itmuch.usercenter.rocketmq;import com.itmuch.usercenter.domain.dto.message.UserAddBonusMsgDTO;
import com.itmuch.usercenter.service.user.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.function.Consumer;@Slf4j
@Configuration
public class AddBounsStreamConsumer {@Autowiredprivate UserService userService;@Beanpublic Consumer<UserAddBonusMsgDTO> addBounsChannel() {return message -> {log.info("addBounsChannel接到消息:{}", message);userService.addBonus(message);};}
}
注意:@Bean注解的方法名和yml里的channel名前半段保持一致
引用的 userService.addBonus
package com.itmuch.usercenter.service.user;import com.itmuch.usercenter.dao.user.BonusEventLogMapper;
import com.itmuch.usercenter.dao.user.UserMapper;
import com.itmuch.usercenter.domain.dto.message.UserAddBonusMsgDTO;
import com.itmuch.usercenter.domain.entity.user.BonusEventLog;
import com.itmuch.usercenter.domain.entity.user.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;
import java.util.Date;@Service
@Slf4j
public class UserService {@Resourceprivate UserMapper userMapper;@Resourceprivate BonusEventLogMapper bonusEventLogMapper;@Transactional(rollbackFor = Exception.class)public void addBonus(UserAddBonusMsgDTO message) {log.info("消费消息 message ={}",message);//当收到消息的时候,执行的业务//1.为用户加积分Integer userId = message.getUserId();User user = userMapper.selectByPrimaryKey(userId);Integer bonus = message.getBonus();user.setBouns(user.getBouns() + bonus);userMapper.updateByPrimaryKeySelective(user);//2.记录日志到bounus_event_log表里面bonusEventLogMapper.insert(BonusEventLog.builder().userId(userId).value(bonus).event("CONTRIBUTE").createTime(new Date()).description("投稿加积分..").build());log.info("积分添加完毕..");}
}