相信大家对消息通信架构以及各种消息中间件应该都不陌生。在分布式系统的设计和开发过程中,消息通信是用于实现系统解耦、提高扩展性的一大技术体系。而业界关于如何实现消息通信系统也有很多解决方案和对应的开发框架。不知道你有没有发现,在我们每天都在使用到Spring框架中,实际上也包含了一套完整而强大的消息通信机制,被广泛的应用在Spring家族的各个框架中,你可能在不经意中已经在使用这套机制了。今天的内容将围绕这点展开讨论,让你在应用这些框架时知其然更知其所以然。
在Spring家族中,与消息通信机制相关的框架有三个,分别是Spring Messaging、Spring Integration和Spring Cloud Steam。事实上,Spring Cloud中的Spring Cloud Stream是基于Spring Integration实现了消息发布和消费机制并提供了一层封装,很多关于消息发布和消费的概念和实现方法本质上都是依赖于Spring Integration。而在Spring Integration的背后,则依赖于Spring Messaging组件来实现消息处理机制的基础设施。这三个框架之间的依赖关系如下图所示。
接下来的内容,我们先来对位于底层的Spring Messaging和Spring Integration框架做一些展开,方便你在使用Spring Cloud Stream时对其背后的实现原理有更好的理解。
Spring Messaging
Spring Messaging是Spring基础框架中的一个底层模块,用于提供统一的消息编程模型。例如,消息这个数据单元在Spring Messaging中统一定义Message接口,包括一个消息头Header和一个消息体Payload。
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
而消息通道MessageChannel的定义也比较简单。
public interface MessageChannel {
long INDEFINITE_TIMEOUT = -1;
default boolean send(Message<?> message) {
return send(message, INDEFINITE_TIMEOUT);
}
boolean send(Message<?> message, long timeout);
}
可以看到,我们可以调用MessageChannel的send方法将消息发送至该消息通道中。
消息通道的概念比较抽象,可以简单把它理解为是对队列的一种抽象。通道的名称对应的就是队列的名称,但是作为一种抽象和封装,各个消息通信系统所特有的队列概念并不会直接暴露在业务代码中,而是通过通道来对队列进行配置。下图展示了这层抽象关系。
Spring Messaging把通道抽象成两种基本表现形式,即支持轮询的PollableChannel和实现发布-订阅模式的SubscribableChannel,这两个通道都继承自具有消息发送功能的MessageChannel。
public interface PollableChannel extends MessageChannel {
Message<?> receive();
Message<?> receive(long timeout);
}
public interface SubscribableChannel extends MessageChannel {
boolean subscribe(MessageHandler handler);
boolean unsubscribe(MessageHandler handler);
}
我们注意到对于PollableChannel而言才有receive的概念,代表这是通过轮询操作主动获取消息的过程。而SubscribableChannel则是通过注册回调函数MessageHandler来实现事件响应。MessageHandler接口定义如下。
public interface MessageHandler {
void handleMessage(Message<?> message) throws MessagingException;
}
通过MessageHandler,我们就可以实现对消息的异步消费。
Spring Integration
Spring Integration是对Spring Messaging的扩展。在Spring Messaging的基础上,Spring Integration还实现了其他几种有用的通道。
这里列举了支持阻塞式队列的RendezvousChannel,该通道与带缓存的QueueChannel都属于点对点通道,但只有在前一个消息被消费之后才能发送下一个消息。PriorityChannel即优先级队列。而DirectChannel是Spring Integration的默认通道,该通道的消息发送和接收过程处于同一线程中。另外还有ExecutorChannel,使用基于多线程的TaskExecutor来异步消费通道中的消息。
Spring Integration的设计目的是为了系统集成,因此内部提供了大量的集成化端点方便应用程序直接使用。当各个异构系统之间进行集成时,如何屏蔽各种技术体系所带来的差异性,Spring Integration为我们提供了即插即用的解决方案。Spring Integration提供的常见集成端点包括HTTP、JDBC、JMS、AMQP、JPA、Mail、MongoDB、Redis等。
Spring Cloud Stream
Spring Cloud Stream是Spring Integration的一种增强,同时与Spring Boot体系进行了融合,也是Spring Cloud家族中专门用来实现微服务环境下消息通信的核心框架。我们先来看一下Spring Cloud Stream中与消息通信相关的内容。
Spring Cloud Stream中的通道
结合前面关于Spring Messaging和Spring Integration中各种概念的描述,我们就不难理解Spring Cloud Stream中关于Source和Sink的定义。Source和Sink都是接口,其中Source接口的定义是这样的。
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
注意到这里通过Spring Messaging提供的MessageChannel来发送消息,而MessageChannel类来自于Spring Messaging组件。我们在MessageChannel上添加了一个@Output注解,该注解定义了一个输出通道。
类似的,Sink接口定义如下所示。
public interface Sink{
String INPUT = "input";
@Input(Source.INPUT)
SubscribableChannel input();
}
同样,这里通过Spring Messaging中的SubscribableChannel来实现消息接收,而@Input注解定义了一个输入通道。
注意到@Input和@Output注解使用通道名称作为参数,如果没有名称,会使用带注解的方法名字作为参数,也就是默认情况下分别使用“input”和“output”作为通道名称。从这个角度讲,一个Spring Cloud Stream应用程序中的Input和Output通道数量和名称都是可以任意设置的,我们只需要在这些通道的定义上添加@Input和@Output注解即可,这里也给出相应的示例代码。
public interface CustomChannels{
@Output
MessageChannel output1();
@Input
SubscribableChannel input1();
@Input
SubscribableChannel input2();
}
可以看到,我们在这里定义了一个CustomChannels接口并声明了一个Output通道和两个Input通道,说明使用该通道的服务会从外部的两个通道中获取消息并向外部的一个通道发送消息。注意到上述接口同时使用到了Spring Messaging中的SubscribableChannel和MessageChannel。Spring Cloud Stream对Spring Messaging和Spring Integration提供了原生支持,我们可以直接使用这两个框架所提供的API直接操作消息发布和接收的过程。但在多数场景下,我们不需要依赖这种方式就能完成常见的开发需求。
Spring Cloud Stream整体架构
介绍完Spring Cloud Stream所提供的通道之后,我们来进一步分析它的整体架构。Spring Cloud Stream对整个消息发布和消费过程做了高度抽象,并提供了一系列核心组件。区别于直接使用RabbitMQ、Kafka等消息中间件,Spring Cloud Stream为开发人员提供了一套统一的API。相当于Spring Cloud Stream在消息生产者和消费者之间添加了一种桥梁机制,这种桥梁机制屏蔽了各种消息中间件之间的差异。
在上图中,我们不难看出Spring Cloud Stream具备四个核心组件,分别是Binder、Channel、Source和Sink,其中Binder和Channel成对出现,而Source和Sink分别面向消息的发布者和消费者。我们已经理解了Source、Sink和Channel的概念,这里重点对Binder进行展开。
Spring Cloud Stream中最重要的概念就是Binder。所谓Binder,顾名思义就是一种黏合剂,将业务服务与消息通信系统黏合在一起。通过Binder,我们可以很方便的连接消息中间件,可以动态的改变消息的目标地址、发送方式而不需要了解其背后的各种消息中间件在实现上的差异。到目前为止,Spring Cloud Stream通过Binder组件分别完成了对RabbitMQ以及Kafka的集成,我们可以基于Spring Cloud Stream所提供的统一的API来使用这两款消息中间件,而不需要具体处理它们在消息发布和消费上的差异。这点显著提高了广大开发人员的开发效率。
总结
我们知道软件设计和实现过程都应该采用分层的思想,而Spring框架针对消息通信的解决方案也采用了同样的设计思想。我们发现在Spring中,用于实现消息通信的组件和框架并不是只有一个,而是有三个。从提供消息、通道等基础概念的Spring Messaging开始,到用于构建系统集成总线的Spring Integration,最后才是具备平台型消息通信能力的Spring Cloud Stream。这三个框架各自的定位,以及功能的演进过程值得我们深入进行分析。