消息驱动 微服务器,消息驱动的微服务-Spring Cloud Stream整合RocketMQ

系列文章导航: Spring Cloud Alibaba微服务解决方案

常用MQ产品的选择

目前主流的MQ产品有kafka、RabbitMQ、ActiveMQ、RocketMQ等。在MQ选型时可以参照这篇文章选择合适的MQ产品。

RocketMQ及控制台搭建

RocketMQ的搭建可以参考这篇文章。

RocketMQ控制台的搭建可以参考这篇文章。

RocketMQ与Spring Boot整合

pom.xml中添加如下依赖

org.apache.rocketmq

rocketmq-spring-boot-starter

2.0.3

配置文件中添加如下配置

rocketmq:

name-server: 172.17.0.102:9876

producer:

# 构建rocketMQtemplate必须指定group

group: test-producer

生产者发送消息

@Slf4j

@Service

@RequiredArgsConstructor(onConstructor = @__(@Autowired))

public class ShareServiceImpl implements IShareService {

private final @NonNull RocketMQTemplate rocketMQTemplate;

@Override

public Share aduitById(Integer id, ShareAuditDTO shareAuditDTO) {

//第一个参数为主题topic,第二个参数为消息体对象

rocketMQTemplate.convertAndSend("add-bonus",

UserAddBonusMsgDTO.builder()

.userId(share.getUserId())

.bonus(50).build());

return share;

}

}

发送后可在RocketMQ-Console控制台查看

b02e2477caf3176e24a358cddd22f885.png

消费者监听消息并进行业务处理

@Service

//在@RocketMQMessageListener注解中设置消费者group和监听主题topic

@RocketMQMessageListener(consumerGroup = "test-consumer", topic = "add-bonus")

@RequiredArgsConstructor(onConstructor = @__(@Autowired))

//将监听的消息体对象指定为RocketMQListener类的泛型

public class AddBonusListener implements RocketMQListener{

@Override

public void onMessage(UserAddBonusMsgDTO userAddBonusMsgDTO) {

//处理业务逻辑

// do something ...

}

}

事务消息

事务消息会将发送的消息进行标记,在收到commit指令后才会进行消息投递。事务消息的执行流程如下图:

6df104356adba079161e8a10da96c147.png

这个方案有个前提,它假设消费者总是有能力成功处理消息。如果消费者消费失败,可以进行重试,如果依然失败,会进入死信队列。进入死信队列的消息可以重新入队,或者人工介入去处理。当然,也可以对消费失败的消息加入补偿机制,来保证数据的一致性。

发送半消息

@Slf4j

@Service

@RequiredArgsConstructor(onConstructor = @__(@Autowired))

public class ShareServiceImpl implements IShareService {

private final @NonNull RocketMQTemplate rocketMQTemplate;

@Override

public String auditById(Integer id, ShareAuditDTO shareAuditDTO, UserAddBonusMsgDTO userAddBonusMsgDTO) {

String transactionId = UUID.randomUUID().toString();

rocketMQTemplate.sendMessageInTransaction(

//group

"tx-add-bonus-group",

//topic

"add-bonus",

//消息体

MessageBuilder.withPayload(userAddBonusMsgDTO)

//设置header,执行本地事务时可以获取使用

.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)

.setHeader("share_id", id).build()

,

//设置arg,执行本地事务时可以获取使用

shareAuditDTO

);

return "success";

}

}

本地事务执行及状态检查

//指定监听本地事务的group

@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")

@RequiredArgsConstructor(onConstructor = @__(@Autowired))

public class AddbonusTransactionListener implements RocketMQLocalTransactionListener {

private final @NonNull IShareService shareService;

private final @NonNull RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

@Override

public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {

//执行本地事务

MessageHeaders headers = message.getHeaders();

String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

Integer shareId = Integer.valueOf((String) headers.get("share_id"));

try {

//service方法中将生成的transactionId进行存储

shareService.auditByIdInDBWithRocketMQLog(shareId, (ShareAuditDTO) o, transactionId);

return RocketMQLocalTransactionState.COMMIT;

} catch (Exception e) {

return RocketMQLocalTransactionState.ROLLBACK;

}

}

@Override

public RocketMQLocalTransactionState checkLocalTransaction(Message message) {

//检查本地事务是否执行成功

String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);

//通过检查本地事务日志记录,确认本地事务是否执行成功

RocketmqTransactionLog rocketmqTransactionLog = rocketmqTransactionLogMapper.selectOne(

RocketmqTransactionLog.builder()

.transactionId(transactionId).build()

);

if (rocketmqTransactionLog != null) {

return RocketMQLocalTransactionState.COMMIT;

}

return RocketMQLocalTransactionState.ROLLBACK;

}

}

RocketMQ与Spring Cloud Stream整合

添加依赖

com.alibaba.cloud

spring-cloud-starter-stream-rocketmq

yml添加配置

spring:

cloud:

stream:

rocketmq:

binder:

name-server: ${rocketmq.name-server}

bindings:

output:

destination: stream-test-topic

#名称需和@output注解中指定的名称一致

my-output:

#指定输出的topic

destination: stream-my-topic

#名称需和@input注解中指定的名称一致

my-input:

#指定接收的topic

destination: stream-my-topic

#如果整合RocketMQ,必须设置group

#如果整合其他MQ,可留空

group: binder-my-group

生产者

编写自定义Source接口,使用@Output注解指定消息管道的名称,需与yml配置文件中bindings下配置的管道名称一致,才能在IOC注入时获取到yml中指定的destination值做为发送的topic,如果不对应会默认使用@Output注解指定的值作为topic,自定义消费者Sink时同理。

public interface MySource {

String MY_OUTPUT = "my-output";

@Output(MY_OUTPUT)

MessageChannel output();

}

启动类注册自定义Source接口

@SpringBootApplication

@EnableBinding({Source.class, MySource.class})

public class ContentCenterApplication {

public static void main(String[] args) {

SpringApplication.run(ContentCenterApplication.class, args);

}

}

调用自定义Source接口发送消息

@RestController

@RequestMapping

@RequiredArgsConstructor(onConstructor = @__(@Autowired))

public class TestController {

private final @NonNull MySource mySource;

@GetMapping("/test-stream")

public String testStream() {

mySource.output().send(MessageBuilder.withPayload("测试stream-2消息体").build());

return "success";

}

}

消费者

编写自定义Sink接口,使用@Input注解指定消息管道的名称,需与yml配置文件中bindings下配置的管道名称一致。

public interface MySink {

String MY_INPUT = "my-input";

@Input(MY_INPUT)

SubscribableChannel input();

}

启动类注册自定义Sink接口

@SpringBootApplication

@EnableBinding({Sink.class, MySink.class})

public class UserCenterApplication {

public static void main(String[] args) {

SpringApplication.run(UserCenterApplication.class, args);

}

}

使用@StreamListener注解指定自定义Sink监听消息并处理

@Service

@Slf4j

public class MyTestStreamConsumer {

@StreamListener(MySink.MY_INPUT)

public void receive(String messageBody) {

log.info("自定义接收器,通过stream收到消息:messageBody = {}", messageBody);

}

}

使用Source发送事务消息

使用Spring Cloud Stream发送RocketMQ的事务消息时,Source接口发送的消息无法在方法调用时指定事务消息的监听group,需在yml配置中进行设置

spring:

cloud:

stream:

rocketmq:

binder:

name-server: 172.17.0.102:9876

bindings:

#管道名称需与stream.bindings对应

output:

producer:

#标注为事务消息

transactional: true

#事务消息监听group名称,对应@RocketMQTransactionListener注解txProducerGroup属性

group: tx-add-bonus-group

bindings:

output:

destination: add-bonus

发送事务消息时,仅能够通过headers发送参数。

@Service

@RequiredArgsConstructor(onConstructor = @__(@Autowired))

public class ShareServiceImpl implements IShareService {

private final @NonNull Source source;

@Override

public Share auditById(Integer id, ShareAuditDTO shareAuditDTO) {

//....

String transactionId = UUID.randomUUID().toString();

source.output().send(

MessageBuilder.withPayload(

UserAddBonusMsgDTO.builder()

.userId(share.getUserId())

.bonus(50).build())

//设置header,执行本地事务时可以获取使用

.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)

.setHeader("share_id", id)

.setHeader("dto", JSON.toJSONString(shareAuditDTO))

.build()

);

//....

return share;

}

}

Spring Cloud Stream消息过滤消费

参考这篇文章。

Spring Cloud Stream异常处理

参考这篇文章。

Spring Cloud Stream概念及注解

参考这篇文章。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/465052.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

低并发编程

大家好,我是闪客,感谢 写代码的篮球球痴 提供的平台让我在这里给大家介绍自己,这是我的公众号卡片。为了防止大家看到这里就点击了返回按钮,我先放一张图勾引一下您。这是我公众号做的第一张动图,好多读者当时说被这张…

总结一些调试的心得,ES7243

这两天在调试一个与语音ADC芯片,也遇到了一些问题,到目前位置也解决了问题,所以想说一下嵌入式调试的一些心得,如果大家在调试设备的时候遇到问题,可以回头来看看这篇文章,可能会得到一些启发。我调试的系统…

web存储机制localStorage和sessionStorage

https://www.cnblogs.com/yaoyuqian/p/7901052.html web存储包括两种:sessionStorage 和 localStorage(都是限定在文档源级别,非同源文档间无法共享) 1.sessionStorage 数据放在服务器上(IE不支持)严格用于…

“元宇宙” 是什么东西?

最近元宇宙的概念很火,所以转发一篇文章给大家看看。每当一个新东西出来的时候,有的人觉得这个是个好东西,也有人嗤之以鼻,觉得这个就是用来割韭菜的。就拿比特币来说,比特币有什么价值?他的价值无非就是操…

分布式系统服务器要求,浅谈分布式系统

分布式系统的由来软件系统的架构一直以来随着技术的发展和市场的需求进行着不断的演进。最初,各行业业务相对比较简单,对系统的要求也不高,软件系统的架构均采用单一应用架构,此时单台服务器即可满足系统的要求。之后,…

OCP Java 自测

一个朋友准备去考OCP Java认证,即原来的SCJP。心血来潮也想测测自己什么水平。找了本McGraw.Hill.OCP.Java.SE.6.Programmer.Practice.Exams,开盘就是两套自测题。14个题目,给了42分钟,按书中说法是过了8个就可以去考了。掐上秒表…

内核该怎么学?Linux进程管理工作原理(代码演示)

前言:Linux内核里大部分都是C语言。建议先看《Linux内核设计与实现(Linux Kernel Development)》,Robert Love,也就是LKD。Linux是一种动态系统,能够适应不断变化的计算需求。Linux计算需求的表现是以进程的通用抽象为中心的。进程可以是短期…

如果访问云服务器上的文件,如果访问云服务器上的文件

如果访问云服务器上的文件 内容精选换一换WinSCP工具可以实现在本地与远程计算机之间安全地复制文件。与使用FTP上传代码相比,通过 WinSCP 可以直接使用服务器账户密码访问服务器,无需在服务器端做任何配置。通常本地Windows计算机将文件上传至Linux服务…

int *p = *******a是什么鬼?

这是在朋友圈里面看到有人调侃的一个C语言题目,这里拿出来分享给大家看看。1我们知道int a 120; int* p &a;这样我们可以给指针p赋值。指针很多初学者学习的时候会觉得一脸懵逼,我们只要明白几个关键的东西,会让我们对指针理解更深入一…

你见过的MCU最高GPIO翻转频率是多少?

大家好,我是痞子衡,是正经搞技术的痞子。今天痞子衡给大家介绍的是i.MXRT1010上的普通GPIO与高速GPIO极限翻转频率。上一篇文章 《聊聊i.MXRT1xxx上的普通GPIO与高速GPIO差异及其用法》,痞子衡从原理上介绍了 i.MXRT1xxx 系列里普通 GPIO 和 …

django中的admin组件之自定义组件的增删改查的完善

昨天我们将自定义列放在类我们自定义的Bookconfig配置类内,但是这样就写死了,因为当我们访问publish表的时候应该也有这样的自定义列,所以我们应该将我们的自定义列放在默认的配置表里面。应该怎么做? 当我们的自定义列挪到默认配…

“制造商和技术支持商”

1.用优化工具。 2.system32中的OEMINFO.ini和OEMLOGO.bmp文件 转载于:https://blog.51cto.com/honglingjin2011/537680

青春是一列不再回头的火车…

高中那年,我表姐对我说:“不要老想着出去打工赚钱,好好读书,将来肯定有用,也不要想着现在日子长得很,等你像我这样结婚生子后,一天一眨眼就过完了。”当时听了没有多大感觉,如今深以…

我和周立功的聊天

算起来,我和周工认识也有7年了,7年前我在中兴,偶然一次加了周工的微信,有一次年末,周立功在推广他们的示波器,广哥拉我进周立功的示波器技术支持群微信群,说是周工要给大家发红包。那时候&#…

Python 37 进程池与线程池 、 协程

一:进程池与线程池 提交任务的两种方式: 1、同步调用:提交完一个任务之后,就在原地等待,等任务完完整整地运行完毕拿到结果后,再执行下一行代码,会导致任务是串行执行 2、异步调用:提…

在腾讯做嵌入式是怎么样的

昨天发朋友圈,是我帮忙同学拍的几张照片,自己觉得拍的不错,点赞的人还挺多的,就想着聊聊在腾讯做嵌入式软件开发的情况。我面试的BSP驱动开发工程师,入职后也从事这方面的事情,但是并不仅仅是BSP驱动。现在…

NFS无法启动根文件系统的解决

为了调试驱动,整了一天的NFS启动根文件系统出了各种问题,后来还是一一解决,不过还不太完美,因为不能使用交换机,我只能用PC和目标板直连,导致我上网很麻烦 无法挂载问题一: IP-Config: Cannot a…

聊聊身边的嵌入式,为什么老司机都爱后视镜

为什么老司机都爱后视镜有句话形容我们老司机的成长过程,叫:一年虎,二年狼,三年变成小绵羊。如果你不懂这句话的意思,证明你还不是一个合格的老司机。如果没和别人发生过亲(Gua)密(Ceng)接(Shi)触(Gu),算得…

HDU 2187 悼念512汶川大地震遇难同胞——老人是真饿了

http://acm.hdu.edu.cn/showproblem.php?pid2187 Problem Description时间:2008年5月16日(震后第4天)地点:汶川县牛脑寨人物:羌族老奶奶【转载整理】牛脑寨是一个全村600多人的羌族寨子,震后几天&#xff…

第六次周赛

失望…… B题其实很简单&#xff0c;完全有能力出&#xff0c;我和luyi陷在我一个错误的树DP模型中…… 其实就是几个简单的组合数&#xff0c;谁可以都会的问题。 PROBLEM B 1 #include <iostream>2 #include <algorithm>3 #include <cstring>4 #include &l…