消息驱动 微服务器,消息驱动的微服务-Spring Cloud Stream整合RocketMQ

系列文章导航: Spring Cloud Alibaba微服务解决方案

常用MQ产品的选择

目前主流的MQ产品有kafka、RabbitMQ、ActiveMQ、RocketMQ等。在MQ选型时可以参照这篇文章选择合适的MQ产品。

RocketMQ及控制台搭建

RocketMQ的搭建可以参考这篇文章。

RocketMQ控制台的搭建可以参考这篇文章。

RocketMQ与Spring Boot整合

pom.xml中添加如下依赖

org.apache.rocketmq

rocketmq-spring-boot-starter

2.0.3

配置文件中添加如下配置

rocketmq:

name-server: 172.17.0.102:9876

producer:

# 构建rocketMQtemplate必须指定group

group: test-producer

生产者发送消息

@Slf4j

@Service

@RequiredArgsConstructor(onConstructor = @__(@Autowired))

public class ShareServiceImpl implements IShareService {

private final @NonNull RocketMQTemplate rocketMQTemplate;

@Override

public Share aduitById(Integer id, ShareAuditDTO shareAuditDTO) {

//第一个参数为主题topic,第二个参数为消息体对象

rocketMQTemplate.convertAndSend("add-bonus",

UserAddBonusMsgDTO.builder()

.userId(share.getUserId())

.bonus(50).build());

return share;

}

}

发送后可在RocketMQ-Console控制台查看

b02e2477caf3176e24a358cddd22f885.png

消费者监听消息并进行业务处理

@Service

//在@RocketMQMessageListener注解中设置消费者group和监听主题topic

@RocketMQMessageListener(consumerGroup = "test-consumer", topic = "add-bonus")

@RequiredArgsConstructor(onConstructor = @__(@Autowired))

//将监听的消息体对象指定为RocketMQListener类的泛型

public class AddBonusListener implements RocketMQListener{

@Override

public void onMessage(UserAddBonusMsgDTO userAddBonusMsgDTO) {

//处理业务逻辑

// do something ...

}

}

事务消息

事务消息会将发送的消息进行标记,在收到commit指令后才会进行消息投递。事务消息的执行流程如下图:

6df104356adba079161e8a10da96c147.png

这个方案有个前提,它假设消费者总是有能力成功处理消息。如果消费者消费失败,可以进行重试,如果依然失败,会进入死信队列。进入死信队列的消息可以重新入队,或者人工介入去处理。当然,也可以对消费失败的消息加入补偿机制,来保证数据的一致性。

发送半消息

@Slf4j

@Service

@RequiredArgsConstructor(onConstructor = @__(@Autowired))

public class ShareServiceImpl implements IShareService {

private final @NonNull RocketMQTemplate rocketMQTemplate;

@Override

public String auditById(Integer id, ShareAuditDTO shareAuditDTO, UserAddBonusMsgDTO userAddBonusMsgDTO) {

String transactionId = UUID.randomUUID().toString();

rocketMQTemplate.sendMessageInTransaction(

//group

"tx-add-bonus-group",

//topic

"add-bonus",

//消息体

MessageBuilder.withPayload(userAddBonusMsgDTO)

//设置header,执行本地事务时可以获取使用

.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)

.setHeader("share_id", id).build()

,

//设置arg,执行本地事务时可以获取使用

shareAuditDTO

);

return "success";

}

}

本地事务执行及状态检查

//指定监听本地事务的group

@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")

@RequiredArgsConstructor(onConstructor = @__(@Autowired))

public class AddbonusTransactionListener implements RocketMQLocalTransactionListener {

private final @NonNull IShareService shareService;

private final @NonNull RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

@Override

public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {

//执行本地事务

MessageHeaders headers = message.getHeaders();

String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

Integer shareId = Integer.valueOf((String) headers.get("share_id"));

try {

//service方法中将生成的transactionId进行存储

shareService.auditByIdInDBWithRocketMQLog(shareId, (ShareAuditDTO) o, transactionId);

return RocketMQLocalTransactionState.COMMIT;

} catch (Exception e) {

return RocketMQLocalTransactionState.ROLLBACK;

}

}

@Override

public RocketMQLocalTransactionState checkLocalTransaction(Message message) {

//检查本地事务是否执行成功

String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);

//通过检查本地事务日志记录,确认本地事务是否执行成功

RocketmqTransactionLog rocketmqTransactionLog = rocketmqTransactionLogMapper.selectOne(

RocketmqTransactionLog.builder()

.transactionId(transactionId).build()

);

if (rocketmqTransactionLog != null) {

return RocketMQLocalTransactionState.COMMIT;

}

return RocketMQLocalTransactionState.ROLLBACK;

}

}

RocketMQ与Spring Cloud Stream整合

添加依赖

com.alibaba.cloud

spring-cloud-starter-stream-rocketmq

yml添加配置

spring:

cloud:

stream:

rocketmq:

binder:

name-server: ${rocketmq.name-server}

bindings:

output:

destination: stream-test-topic

#名称需和@output注解中指定的名称一致

my-output:

#指定输出的topic

destination: stream-my-topic

#名称需和@input注解中指定的名称一致

my-input:

#指定接收的topic

destination: stream-my-topic

#如果整合RocketMQ,必须设置group

#如果整合其他MQ,可留空

group: binder-my-group

生产者

编写自定义Source接口,使用@Output注解指定消息管道的名称,需与yml配置文件中bindings下配置的管道名称一致,才能在IOC注入时获取到yml中指定的destination值做为发送的topic,如果不对应会默认使用@Output注解指定的值作为topic,自定义消费者Sink时同理。

public interface MySource {

String MY_OUTPUT = "my-output";

@Output(MY_OUTPUT)

MessageChannel output();

}

启动类注册自定义Source接口

@SpringBootApplication

@EnableBinding({Source.class, MySource.class})

public class ContentCenterApplication {

public static void main(String[] args) {

SpringApplication.run(ContentCenterApplication.class, args);

}

}

调用自定义Source接口发送消息

@RestController

@RequestMapping

@RequiredArgsConstructor(onConstructor = @__(@Autowired))

public class TestController {

private final @NonNull MySource mySource;

@GetMapping("/test-stream")

public String testStream() {

mySource.output().send(MessageBuilder.withPayload("测试stream-2消息体").build());

return "success";

}

}

消费者

编写自定义Sink接口,使用@Input注解指定消息管道的名称,需与yml配置文件中bindings下配置的管道名称一致。

public interface MySink {

String MY_INPUT = "my-input";

@Input(MY_INPUT)

SubscribableChannel input();

}

启动类注册自定义Sink接口

@SpringBootApplication

@EnableBinding({Sink.class, MySink.class})

public class UserCenterApplication {

public static void main(String[] args) {

SpringApplication.run(UserCenterApplication.class, args);

}

}

使用@StreamListener注解指定自定义Sink监听消息并处理

@Service

@Slf4j

public class MyTestStreamConsumer {

@StreamListener(MySink.MY_INPUT)

public void receive(String messageBody) {

log.info("自定义接收器,通过stream收到消息:messageBody = {}", messageBody);

}

}

使用Source发送事务消息

使用Spring Cloud Stream发送RocketMQ的事务消息时,Source接口发送的消息无法在方法调用时指定事务消息的监听group,需在yml配置中进行设置

spring:

cloud:

stream:

rocketmq:

binder:

name-server: 172.17.0.102:9876

bindings:

#管道名称需与stream.bindings对应

output:

producer:

#标注为事务消息

transactional: true

#事务消息监听group名称,对应@RocketMQTransactionListener注解txProducerGroup属性

group: tx-add-bonus-group

bindings:

output:

destination: add-bonus

发送事务消息时,仅能够通过headers发送参数。

@Service

@RequiredArgsConstructor(onConstructor = @__(@Autowired))

public class ShareServiceImpl implements IShareService {

private final @NonNull Source source;

@Override

public Share auditById(Integer id, ShareAuditDTO shareAuditDTO) {

//....

String transactionId = UUID.randomUUID().toString();

source.output().send(

MessageBuilder.withPayload(

UserAddBonusMsgDTO.builder()

.userId(share.getUserId())

.bonus(50).build())

//设置header,执行本地事务时可以获取使用

.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)

.setHeader("share_id", id)

.setHeader("dto", JSON.toJSONString(shareAuditDTO))

.build()

);

//....

return share;

}

}

Spring Cloud Stream消息过滤消费

参考这篇文章。

Spring Cloud Stream异常处理

参考这篇文章。

Spring Cloud Stream概念及注解

参考这篇文章。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/465052.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

低并发编程

大家好,我是闪客,感谢 写代码的篮球球痴 提供的平台让我在这里给大家介绍自己,这是我的公众号卡片。为了防止大家看到这里就点击了返回按钮,我先放一张图勾引一下您。这是我公众号做的第一张动图,好多读者当时说被这张…

Redhat的Linux产品版本AS/ES/WS的联系与区别

Redhat有两大Linux产品系列,其一是免费的Fedora Core系列主要用于桌面版本,提供了较多新特性的支持。另外一个产品系列是收费的Enterprise系列,这个系列分成:AS/ES/WS等分支,他们都是redhat企业级Linux,简称…

day34进程相关

进程1 什么是进程 进程指的是一个正在进行/运行的程序,进程是用来描述程序执行过程的虚拟概念 进程vs程序 程序:一堆代码 进程:程序的执行的过程 进程的概念起源于操作系统,进程是操作系统最核心的概念,操作系统其它所有的概念都是围绕进程来 操作系统理论: …

总结一些调试的心得,ES7243

这两天在调试一个与语音ADC芯片,也遇到了一些问题,到目前位置也解决了问题,所以想说一下嵌入式调试的一些心得,如果大家在调试设备的时候遇到问题,可以回头来看看这篇文章,可能会得到一些启发。我调试的系统…

电信无线网服务器是什么,怎样使用路由器共享电信天翼无线网络

准备工作:1.我们是四台笔记本共享:型号分别是联想thinkpad,联想非thinkpad,宏基,还有一台老爷dell(奔三 700MHZ 够老爷了吧?装的还是windows2000的系统)2. 路由器一台:腾达路由器,4孔的(TP-lin…

nls_lang.sh: 114: [[: not found

在ubuntu 10.10桌面版上安装oracle官网下载的oracle-xe-universal_10.2.0.1-1.0_i386.deb。 安装时需要加大swap分区的大小到1G以上,可以用这些命令增加dd if/dev/zero of/tmpswap bs1M count200 mkswap /tmpswap swapon /tmpswap 然后配置,在运行/etc/…

web存储机制localStorage和sessionStorage

https://www.cnblogs.com/yaoyuqian/p/7901052.html web存储包括两种:sessionStorage 和 localStorage(都是限定在文档源级别,非同源文档间无法共享) 1.sessionStorage 数据放在服务器上(IE不支持)严格用于…

“元宇宙” 是什么东西?

最近元宇宙的概念很火,所以转发一篇文章给大家看看。每当一个新东西出来的时候,有的人觉得这个是个好东西,也有人嗤之以鼻,觉得这个就是用来割韭菜的。就拿比特币来说,比特币有什么价值?他的价值无非就是操…

分布式系统服务器要求,浅谈分布式系统

分布式系统的由来软件系统的架构一直以来随着技术的发展和市场的需求进行着不断的演进。最初,各行业业务相对比较简单,对系统的要求也不高,软件系统的架构均采用单一应用架构,此时单台服务器即可满足系统的要求。之后,…

OCP Java 自测

一个朋友准备去考OCP Java认证,即原来的SCJP。心血来潮也想测测自己什么水平。找了本McGraw.Hill.OCP.Java.SE.6.Programmer.Practice.Exams,开盘就是两套自测题。14个题目,给了42分钟,按书中说法是过了8个就可以去考了。掐上秒表…

内核该怎么学?Linux进程管理工作原理(代码演示)

前言:Linux内核里大部分都是C语言。建议先看《Linux内核设计与实现(Linux Kernel Development)》,Robert Love,也就是LKD。Linux是一种动态系统,能够适应不断变化的计算需求。Linux计算需求的表现是以进程的通用抽象为中心的。进程可以是短期…

个人博客开通

点此链接 欢迎来访 ---by wolf96转载于:https://www.cnblogs.com/zhanlang96/p/9610864.html

如果访问云服务器上的文件,如果访问云服务器上的文件

如果访问云服务器上的文件 内容精选换一换WinSCP工具可以实现在本地与远程计算机之间安全地复制文件。与使用FTP上传代码相比,通过 WinSCP 可以直接使用服务器账户密码访问服务器,无需在服务器端做任何配置。通常本地Windows计算机将文件上传至Linux服务…

solaris下用户不能正常登录CDE界面的解决办法

使用过solaris系统的人,有时可能会碰到以普通用户不能正常登录CDE界面的问题,或者输入用户名和口令后,又回到登录框的现象,针对这些问题,建议按下面的步骤进行操作: 1.无法正常登录可能是由于CDE的不正确设…

int *p = *******a是什么鬼?

这是在朋友圈里面看到有人调侃的一个C语言题目,这里拿出来分享给大家看看。1我们知道int a 120; int* p &a;这样我们可以给指针p赋值。指针很多初学者学习的时候会觉得一脸懵逼,我们只要明白几个关键的东西,会让我们对指针理解更深入一…

[GO]匿名字段

package mainimport ("fmt" )type Person struct {name stringsex byteage int }type Student struct {Person //只有名字,没有字段,这里student相当于继承了person的所有字段,就有点像是继承了id intaddr string }func main() …

你见过的MCU最高GPIO翻转频率是多少?

大家好,我是痞子衡,是正经搞技术的痞子。今天痞子衡给大家介绍的是i.MXRT1010上的普通GPIO与高速GPIO极限翻转频率。上一篇文章 《聊聊i.MXRT1xxx上的普通GPIO与高速GPIO差异及其用法》,痞子衡从原理上介绍了 i.MXRT1xxx 系列里普通 GPIO 和 …

django中的admin组件之自定义组件的增删改查的完善

昨天我们将自定义列放在类我们自定义的Bookconfig配置类内,但是这样就写死了,因为当我们访问publish表的时候应该也有这样的自定义列,所以我们应该将我们的自定义列放在默认的配置表里面。应该怎么做? 当我们的自定义列挪到默认配…

“制造商和技术支持商”

1.用优化工具。 2.system32中的OEMINFO.ini和OEMLOGO.bmp文件 转载于:https://blog.51cto.com/honglingjin2011/537680

青春是一列不再回头的火车…

高中那年,我表姐对我说:“不要老想着出去打工赚钱,好好读书,将来肯定有用,也不要想着现在日子长得很,等你像我这样结婚生子后,一天一眨眼就过完了。”当时听了没有多大感觉,如今深以…