干货|Spring Cloud Stream 体系及原理介绍

Spring Cloud Stream 在 Spring Cloud 体系内用于构建高度可扩展的基于事件驱动的微服务,其目的是为了简化消息在 Spring Cloud 应用程序中的开发。Spring Cloud Stream (后面以 SCS 代替 Spring Cloud Stream) 本身内容很多,而且它还有很多外部的依赖,想要熟悉 SCS,必须要先了解 Spring Messaging 和 Spring Integration 这两个项目,接下来文章将从以下几点跟大家进行介绍:

  • 什么是 Spring Messaging;
  • 什么是 Spring Integration;
  • 什么是 SCS及其功能;

Spring Messaging

Spring Messaging 是 Spring Framework 中的一个模块,其作用就是统一消息的编程模型。

  • 比如消息 Messaging 对应的模型就包括一个消息体 Payload 和消息头 Header:

package org.springframework.messaging;
public interface Message<T> {T getPayload();MessageHeaders getHeaders();
}
  • 消息通道 MessageChannel 用于接收消息,调用 send 方法可以将消息发送至该消息通道中 :

@FunctionalInterface
public interface MessageChannel {long INDEFINITE_TIMEOUT = -1;default boolean send(Message<?> message) {return send(message, INDEFINITE_TIMEOUT);}boolean send(Message<?> message, long timeout);
}

消息通道里的消息如何被消费呢?

  • 由消息通道的子接口可订阅的消息通道 SubscribableChannel 实现,被 MessageHandler 消息处理器所订阅:
public interface SubscribableChannel extends MessageChannel {boolean subscribe(MessageHandler handler);boolean unsubscribe(MessageHandler handler);
}
  • MessageHandler 真正地消费/处理消息:
@FunctionalInterface
public interface MessageHandler {void handleMessage(Message<?> message) throws MessagingException;
}

Spring Messaging 内部在消息模型的基础上衍生出了其它的一些功能,如:

  1. 消息接收参数及返回值处理:消息接收参数处理器 HandlerMethodArgumentResolver 配合 @Header@Payload 等注解使用;消息接收后的返回值处理器 HandlerMethodReturnValueHandler 配合 @SendTo 注解使用;
  2. 消息体内容转换器 MessageConverter
  3. 统一抽象的消息发送模板 AbstractMessageSendingTemplate
  4. 消息通道拦截器 ChannelInterceptor

Spring Integration

Spring Integration 提供了 Spring 编程模型的扩展用来支持企业集成模式(Enterprise Integration Patterns),是对 Spring Messaging 的扩展。它提出了不少新的概念,包括消息的路由 MessageRoute、消息的分发 MessageDispatcher、消息的过滤 Filter、消息的转换 Transformer、消息的聚合 Aggregator、消息的分割 Splitter 等等。同时还提供了 MessageChannel 和MessageHandler 的实现,分别包括 DirectChannelExecutorChannelPublishSubscribeChannel 和MessageFilterServiceActivatingHandlerMethodInvokingSplitter 等内容。

首先为大家介绍几种消息的处理方式:

  • 消息的分割:

  • 消息的聚合:

  • 消息的过滤:

  • 消息的分发:

接下来,我们以一个最简单的例子来尝试一下 Spring Integration:

SubscribableChannel messageChannel = new DirectChannel(); // 1messageChannel.subscribe(msg -> { // 2System.out.println("receive: " + msg.getPayload());
});messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build()); // 3
  1. 构造一个可订阅的消息通道 messageChannel
  2. 使用 MessageHandler 去消费这个消息通道里的消息;
  3. 发送一条消息到这个消息通道,消息最终被消息通道里的 MessageHandler 所消费,最后控制台打印出: receive: msg from alibaba

DirectChannel 内部有个 UnicastingDispatcher 类型的消息分发器,会分发到对应的消息通道 MessageChannel 中,从名字也可以看出来,UnicastingDispatcher 是个单播的分发器,只能选择一个消息通道。那么如何选择呢? 内部提供了 LoadBalancingStrategy 负载均衡策略,默认只有轮询的实现,可以进行扩展。

我们对上段代码做一点修改,使用多个 MessageHandler 去处理消息:

SubscribableChannel messageChannel = new DirectChannel();messageChannel.subscribe(msg -> {System.out.println("receive1: " + msg.getPayload());
});messageChannel.subscribe(msg -> {System.out.println("receive2: " + msg.getPayload());
});messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());

由于 DirectChannel 内部的消息分发器是 UnicastingDispatcher 单播的方式,并且采用轮询的负载均衡策略,所以这里两次的消费分别对应这两个 MessageHandler。控制台打印出:

receive1: msg from alibaba
receive2: msg from alibaba

既然存在单播的消息分发器 UnicastingDispatcher,必然也会存在广播的消息分发器,那就是 BroadcastingDispatcher,它被 PublishSubscribeChannel 这个消息通道所使用。广播消息分发器会把消息分发给所有的 MessageHandler

SubscribableChannel messageChannel = new PublishSubscribeChannel();messageChannel.subscribe(msg -> {System.out.println("receive1: " + msg.getPayload());
});messageChannel.subscribe(msg -> {System.out.println("receive2: " + msg.getPayload());
});messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());

发送两个消息,都被所有的 MessageHandler 所消费。控制台打印:

receive1: msg from alibaba
receive2: msg from alibaba
receive1: msg from alibaba
receive2: msg from alibaba

Spring Cloud Stream

SCS与各模块之间的关系是:

  • SCS 在 Spring Integration 的基础上进行了封装,提出了 BinderBinding@EnableBinding@StreamListener 等概念;
  • SCS 与 Spring Boot Actuator 整合,提供了 /bindings/channels endpoint;
  • SCS 与 Spring Boot Externalized Configuration 整合,提供了 BindingPropertiesBinderProperties 等外部化配置类;
  • SCS 增强了消息发送失败的和消费失败情况下的处理逻辑等功能。
  • SCS 是 Spring Integration 的加强,同时与 Spring Boot 体系进行了融合,也是 Spring Cloud Bus 的基础。它屏蔽了底层消息中间件的实现细节,希望以统一的一套 API 来进行消息的发送/消费,底层消息中间件的实现细节由各消息中间件的 Binder 完成。

Binder 是提供与外部消息中间件集成的组件,为构造 Binding提供了 2 个方法,分别是 bindConsumer 和 bindProducer ,它们分别用于构造生产者和消费者。目前官方的实现有 Rabbit Binder 和 Kafka Binder, Spring Cloud Alibaba 内部已经实现了 RocketMQ Binder。

从图中可以看出,Binding 是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产。我们来看一个最简单的使用 RocketMQ Binder 的例子,然后分析一下它的底层处理原理:

  • 启动类及消息的发送:
@SpringBootApplication
@EnableBinding({ Source.class, Sink.class }) // 1
public class SendAndReceiveApplication {public static void main(String[] args) {SpringApplication.run(SendAndReceiveApplication.class, args);}@Bean // 2public CustomRunner customRunner() {return new CustomRunner();}public static class CustomRunner implements CommandLineRunner {@Autowiredprivate Source source;@Overridepublic void run(String... args) throws Exception {int count = 5;for (int index = 1; index <= count; index++) {source.output().send(MessageBuilder.withPayload("msg-" + index).build()); // 3}}}
}
  • 消息的接收:
@Service
public class StreamListenerReceiveService {@StreamListener(Sink.INPUT) // 4public void receiveByStreamListener1(String receiveMsg) {System.out.println("receiveByStreamListener: " + receiveMsg);}}

这段代码很简单,没有涉及到 RocketMQ 相关的代码,消息的发送和接收都是基于 SCS 体系完成的。如果想切换成 RabbitMQ 或 kafka,只需修改配置文件即可,代码无需修改。

我们分析这段代码的原理:

  1. @EnableBinding 对应的两个接口属性 Source 和 Sink 是 SCS 内部提供的。SCS 内部会基于 Source 和 Sink 构造 BindableProxyFactory,且对应的 output 和 input 方法返回的 MessageChannel 是 DirectChannel。output 和 input 方法修饰的注解对应的 value 是配置文件中 binding 的 name。
public interface Source {String OUTPUT = "output";@Output(Source.OUTPUT)MessageChannel output();
}
public interface Sink {String INPUT = "input";@Input(Sink.INPUT)SubscribableChannel input();
}

配置文件里 bindings 的 name 为 output 和 input,对应 Source 和 Sink 接口的方法上的注解里的 value:

spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-groupspring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.content-type=text/plain
spring.cloud.stream.bindings.input.group=test-group1
  1. 构造 CommandLineRunner,程序启动的时候会执行 CustomRunner 的 run 方法。
  2. 调用 Source 接口里的 output 方法获取 DirectChannel,并发送消息到这个消息通道中。这里跟之前 Spring Integration 章节里的代码一致。
  • Source 里的 output 发送消息到 DirectChannel 消息通道之后会被 AbstractMessageChannelBinder#SendingHandler 这个 MessageHandler 处理,然后它会委托给 AbstractMessageChannelBinder#createProducerMessageHandler 创建的 MessageHandler 处理(该方法由不同的消息中间件实现);
  • 不同的消息中间件对应的 AbstractMessageChannelBinder#createProducerMessageHandler 方法返回的 MessageHandler 内部会把 Spring Message 转换成对应中间件的 Message 模型并发送到对应中间件的 broker;
  1. 使用 @StreamListener 进行消息的订阅。请注意,注解里的 Sink.input 对应的值是 "input",会根据配置文件里 binding 对应的 name 为 input 的值进行配置:
  • 不同的消息中间件对应的 AbstractMessageChannelBinder#createConsumerEndpoint 方法会使用 Consumer 订阅消息,订阅到消息后内部会把中间件对应的 Message 模型转换成 Spring Message;
  • 消息转换之后会把 Spring Message 发送至 name 为 input 的消息通道中;
  • @StreamListener 对应的 StreamListenerMessageHandler 订阅了 name 为 input 的消息通道,进行了消息的消费;

这个过程文字描述有点啰嗦,用一张图总结一下(黄色部分涉及到各消息中间件的 Binder 实现以及 MQ 基本的订阅发布功能):

SCS 章节的最后,我们来看一段 SCS 关于消息的处理方式的一段代码:

@StreamListener(value = Sink.INPUT, condition = "headers['index']=='1'")
public void receiveByHeader(Message msg) {System.out.println("receive by headers['index']=='1': " + msg);
}@StreamListener(value = Sink.INPUT, condition = "headers['index']=='9999'")
public void receivePerson(@Payload Person person) {System.out.println("receive Person: " + person);
}@StreamListener(value = Sink.INPUT)
public void receiveAllMsg(String msg) {System.out.println("receive allMsg by StreamListener. content: " + msg);
}@StreamListener(value = Sink.INPUT)
public void receiveHeaderAndMsg(@Header("index") String index, Message msg) {System.out.println("receive by HeaderAndMsg by StreamListener. content: " + msg);
}

有没有发现这段代码跟 Spring MVC Controller 中接收请求的代码很像? 实际上他们的架构都是类似的,Spring MVC 对于 Controller 中参数和返回值的处理类分别是 org.springframework.web.method.support.HandlerMethodArgumentResolver、 org.springframework.web.method.support.HandlerMethodReturnValueHandler

Spring Messaging 中对于参数和返回值的处理类之前也提到过,分别是 org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverorg.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler

它们的类名一模一样,甚至内部的方法名也一样。

总结

上图是 SCS 体系相关类说明的总结,关于 SCS 以及 RocketMQ Binder 更多相关的示例,可以参考 RocketMQ Binder Demos,包含了消息的聚合、分割、过滤;消息异常处理;消息标签、sql过滤;同步、异步消费等等。

下一篇文章,我们将分析消息总线(Spring Cloud Bus) 在 Spring Cloud 体系中的作用,并逐步展开,分析 Spring Cloud Alibaba 中的 RocketMQ Binder 是如何实现 Spring Cloud Stream 标准的。


原文链接
本文为云栖社区原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/519275.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

阿里小程序云应用上线了,有哪些看点?

3月21日&#xff0c;在2019阿里云峰会北京上&#xff0c;阿里巴巴旗下的阿里云、支付宝、淘宝、钉钉、高德等联合发布“阿里巴巴小程序繁星计划”&#xff1a;提供20亿元补贴&#xff0c;扶持200万小程序开发者、100万商家。凡入选“超星”的小程序&#xff0c;入驻支付宝、淘宝…

10 个实用功能告诉你,谷歌云(Google Cloud)相对亚马逊云(AWS)有哪些优势?...

来源 | itnext编译 | 武明利责编 | Carol出品 | CSDN云计算&#xff08;ID&#xff1a;CSDNcloud&#xff09;有很多文章将谷歌云提供商&#xff08;GCP&#xff09;与亚马逊云服务&#xff08;AWS&#xff09;进行比较&#xff0c;但这篇文章并不想要做比较。作者主要是一个AW…

mybatis-plus大批量数据插入缓慢问题

文章目录问题排查结果建议问题 最近项目用的mybatis-plus做的映射&#xff0c;有个批处理文件内容的需求&#xff0c;在使用mybatis-plus的批处理方法saveBatch时发现速度特别慢&#xff0c;测试从1000到10000到80000条基本上是线性增加&#xff0c;80000条时差不多要90秒。 …

世界冠军之路:菜鸟车辆路径规划求解引擎研发历程

阿里妹导读&#xff1a;车辆路径规划问题&#xff08;Vehicle Routing Problem, VRP&#xff09;是物流领域最经典的优化问题之一&#xff0c;具有极大的学术研究意义和实际应用价值。菜鸟网络高级算法专家胡浩源带领仓配智能化算法团队经过两年的研发&#xff0c;逐步沉淀出了…

原来,阿里工程师才是隐藏的“修图高手”!

阿里妹导读&#xff1a;在现实世界中&#xff0c;信息通常以不同的模态同时出现。这里提到的模态主要指信息的来源或者形式。例如在淘宝场景中&#xff0c;每个商品通常包含标题、商品短视频、主图、附图、各种商品属性&#xff08;类目&#xff0c;价格&#xff0c;销量&#…

分布式数据集训营,从入门到精通,从理论到实践,你不可错过的精品课程!...

责编 | Carol出品 | CSDN云计算&#xff08;ID&#xff1a;CSDNcloud&#xff09;随着微服务、云化架构的兴起&#xff0c;分布式数据库开始在越来越多的场景得到应用&#xff0c;从外围系统到中台业务&#xff0c;再到核心交易业务&#xff0c;分布式数据库成为企业基础架构转…

mybatis批量插入10万条数据的优化过程

在使用mybatis插入大量数据的时候,为了提高效率,放弃循环插入,改为批量插入,mapper如下: package com.lcy.service.mapper;import com.lcy.service.pojo.TestVO; import org.apache.ibatis.annotations.Insert;import java.util.List;/*** 功能描述&#xff1a;** author liuc…

java spring注解维护,从一次工程启动失败谈谈 spring 注解

原标题&#xff1a;从一次工程启动失败谈谈 spring 注解檀宝权Java 后端开发工程师&#xff0c;负责度假 App 后端和广告后端开发维护工作&#xff0c;熟悉 Tomcat&#xff0c;Spring&#xff0c;Mybatis&#xff0c;会点 Python&#xff0c;Lua。一、背景线上环境升级成 JDK8后…

探索Java日志的奥秘:底层日志系统-log4j2

前言 log4j2是apache在log4j的基础上&#xff0c;参考logback架构实现的一套新的日志系统&#xff08;我感觉是apache害怕logback了&#xff09;。 log4j2的官方文档上写着一些它的优点&#xff1a; 在拥有全部logback特性的情况下&#xff0c;还修复了一些隐藏问题API 分离&…

大地震!某大厂“硬核”抢人,放话:只要AI人才,高中毕业都行!

特斯拉创始人马斯克&#xff0c;在2019年曾许下很多承诺&#xff0c;其中一个就是&#xff1a;2019年底实现完全的自动驾驶。虽然这个承诺又成了flag&#xff0c;但是不妨碍他今年继续为这个承诺努力。这不&#xff0c;就在上周一&#xff0c;马斯克之间在twitter上放话了&…

Dart编译技术在服务端的探索和应用

前言 最近闲鱼技术团队在FlutterDart的多端一体化的基础上&#xff0c;实现了FaaS研发模式。Dart吸取了其它高级语言设计的精华&#xff0c;例如Smalltalk的Image技术、JVM的HotSpot和Dart编译技术又师出同门。由Dart实现的语言容器&#xff0c;它可以在启动速度、运行性能有不…

Python + ElasticSearch:有了这个超级武器,你也可以报名参加诗词大会了! | 博文精选...

来源 | CSDN 博客作者 | 天元浪子责编 | Carol出品 | CSDN云计算&#xff08;ID&#xff1a;CSDNcloud&#xff09;意犹未尽的诗词大会正月十六&#xff0c;中国诗词大会第五季落下帷幕。从2016年2月12日第一季于开播&#xff0c;迄今恰好四周年。在这个舞台上&#xff0c;时年…

Node.js 应用故障排查手册 —— 大纲与常规问题指标简介

楔子 你是否想要尝试进行 Node.js 应用开发但是又总听人说它不安全、稳定性差&#xff0c;想在公司推广扩张大前端的能力范畴和影响又说服不了技术领导。 JavaScript 发展到今天&#xff0c;早已脱离原本浏览器的战场&#xff0c;借助于 Node.js 的诞生将其触角伸到了服务端、P…

蚂蚁金服CTO程立:做工程要有“拧螺丝”的精神

“一台机器可能有无数颗螺丝&#xff0c;需要一个一个地拧&#xff0c;而且需要一圈一圈地拧&#xff0c;才能让系统间严丝合缝&#xff0c;顺利工作。代码的世界里&#xff0c;一个项目到底成功与否&#xff0c;也是取决于几个模型的关键特殊设置&#xff0c;就像拧螺丝一样。…

linux 环境安装DBI和DBD_03

文章目录一、软件下载二、安装DBI2.1. DBI下载2.2. 解压2.3. 安装依赖2.4. 编译2.5. 执行测试2.6. 安装2.6. 修改权限三、安装DBD-ORACLE组件3.1. DBI下载3.2. 修改权限3.3. 切换用户3.4. 解压3.5. 进入目录3.6. 初始化环境变量3.6. 查看配置的环境变量是否配置3.7. 刷新配置文…

像数据科学家一样思考:12步指南(上)

介绍 目前&#xff0c;数据科学家正在受到很多关注&#xff0c;因此&#xff0c;有关数据科学的书籍正在激增。我看过很多关于数据科学的书籍&#xff0c;在我看来他们中的大多数更关注工具和技术&#xff0c;而不是数据科学中细微问题的解决。直到我遇到Brian Godsey的“像数…

Mybatis-plus 大数据量数据流式查询通用接口

文章目录一、案例需求二、使用案例&#xff1a;2.1. 自定义查询接口2.2. 逻辑处理类2.3. 调用案例2.4. 具体逻辑处理案例三、企业案例3.1. key名称获取3.2. 逻辑类测试3.3.最后一个批次处理方案四、 通用SQL预编译处理4.1. 业务场景4.2. xml形式4.3. 注解形式五、企业案例5.1. …

基于MaxCompute的数仓数据质量管理

声明 本文中介绍的非功能性规范均为建议性规范&#xff0c;产品功能无强制&#xff0c;仅供指导。 参考文献 《大数据之路——阿里巴巴大数据实践》——阿里巴巴数据技术及产品部 著。 背景及目的 数据对一个企业来说已经是一项重要的资产&#xff0c;既然是资产&#xff…

IP应用加速 – DCDN迈入全栈新篇章

4月11日&#xff0c;第七届"亚太内容分发大会"暨CDN峰会国际论坛中&#xff0c;阿里云资深技术专家姚伟斌发布了DCDN子产品IP应用加速&#xff08;IPA&#xff09;。IPA是基于阿里云CDN本身的资源优化&#xff0c;对传输层&#xff08;TCP&UDP&#xff09;协议进…

十年磨一剑,王坚自研的MaxCompute如何解决世界级算力难题

大数据时代&#xff0c;随着企业数据规模的急剧增长&#xff0c;传统软件已无法承载&#xff0c;这也推动了大数据技术的发展&#xff0c;Google、AWS、微软等硅谷巨头纷纷投入大数据技术的研发&#xff1b;而在国内&#xff0c;王坚也在十年前带领阿里云团队研发MaxCompute&am…