一、前言
接下来是开展一系列的 SpringCloud 的学习之旅,从传统的模块之间调用,一步步的升级为 SpringCloud 模块之间的调用,此篇文章为第九篇,即介绍 Stream 消息驱动。
二、消息驱动概念
2.1 消息驱动是什么
官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。
说的通俗一点就是:以前你的项目中可能既用到了 rabbitmq,又用到了 kafka,现在只需要使用一个 Spring Cloud Stream 就可以了。
2.2 官网
官网的地址在这,也可以访问它的中文指导手册,但是需要注意的是,目前仅支持 RabbitMQ 和 Kafka。
2.3 设计思想
2.3.1 标准 mq
对于标准的 mq 来说,架构如下图,生产者/消费者之间靠消息媒介传递信息内容,消息必须走特定的通道
2.3.2 Stream
比方说我们用到了 RabbitMQ 和 Kafka,由于这两个消息中间件的架构上的不同,像 RabbitMQ 有 exchange,kafka 有 Topic 和 Partitions 分区。
这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候 springcloud Stream 给我们提供了一种解耦合的方式。
2.3.3 Stream 如何实现统一底层差异
Stream 通过定义绑定器 Binder 作为中间层,实现了应用程序与消息中间件细节之间的隔离。
在没有绑定器这个概念的之前,我们的 SpringBoot 应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的 Channel 通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
2.3.4 绑定器 Binder
通过定义绑定器 Binder 作为中间层,实现了应用程序与消息中间件细节之间的隔离。
Binder 可以生成 Binding,Binding 用来绑定消息容器的生产者和消费者,它有两种类型, INPUT 和 OUTPUT,INPUT 对应于消费者,OUTPUT 对应于生产者。
Stream 中的消息通信方式遵循了发布-订阅模式,在 RabbitMQ 就是 Exchange,在 Kakfa 中就是 Topic。
2.4 Stream 标准流程套路
1、Binder:很方便的连接中间件,屏蔽差异
2、Channel:通道,是队列 Queue 的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过 Channel 对队列进行配置。
3、Source 和 Sink:简单的可理解为参照对象是 Spring Cloud Stream 自身,从 Stream 发布消息就是输出,接受消息就是输入。
2.5 编码 API 和常用注解
组成 | 说明 |
Middleware | 中间件,目前只支持 Rabbitmq 和 Kafka |
Binder | Binder 是应用与消息中间件之间的封装,目前实现了 Kafka 和 Rabbitmq 的 Binder,通过 Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于 Kafka 的 topic,Rabbitmq 的 exchange),这些都可以通过配置文件来实现 |
@Input | 注解标识输入通道,通过该输入通道接收到的消息进入应用程序 |
@Output | 注解标识输出通道,发布的消息将通过该通道离开应用程序 |
@StreamListener | 监听队列,用于消费者的队列的消息的接收 |
@EnableBinding | 指信道 channel 和 exchange 绑定在一起 |
三、消息驱动之生产者
3.1 工程创建
新建一个 cloud-stream-rabbitmq-consumer8801 模块用来充当消息的生产者,pom.xml 内容如下所示,只有一个依赖比较特殊。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.springcloud</groupId><artifactId>SpringCloud</artifactId><version>1.0-SNAPSHOT</version></parent><artifactId>cloud-stream-rabbitmq-provider8801</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><!--引入stream整合rabbit的依赖,如果是kafka,则也引入对于的依赖即可--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><!--基础配置--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies></project>
application.yml 内容如下所示,比较特殊的是引入了 stream 的相关配置,binders 标签指定继承了哪种消息中间件并配置其连接地址、用户名和密码等。bindings 标签表示要对哪些服务进行整合,output 表示这是一个消息的生产者,destination 表示的是 rabbitmq 里面的交换机名称,binder 绑定的是上面指定的消息中间件。
server:port: 8801spring:application:name: cloud-stream-providercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置eureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)instance-id: send-8801.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址
主启动类的代码如下:
@SpringBootApplication
public class StreamMQMain8801
{public static void main(String[] args){SpringApplication.run(StreamMQMain8801.class,args);}
}
创建发送消息 Service 接口和其实现类代码如下:
package com.springcloud.service;public interface IMessageProvider {public String send() ;
}
package com.springcloud.service.impl;import com.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;import javax.annotation.Resource;
import java.util.UUID;// 第一步:可以理解为是一个消息的发送管道的定义,消息的生产者用 source,消息的消费者用 sink
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider
{@Resourceprivate MessageChannel output; // 第二步:引入消息的发送管道@Overridepublic String send(){String serial = UUID.randomUUID().toString();// 第三步:调用 send 方法发送消息this.output.send(MessageBuilder.withPayload(serial).build()); // 创建并发送消息System.out.println("***serial: "+serial);return serial;}
}
创建 controller 类用于远程调用发送消息,代码如下:
@RestController
public class SendMessageController
{@Resourceprivate IMessageProvider messageProvider;@GetMapping(value = "/sendMessage")public String sendMessage(){return messageProvider.send();}
}
3.2 测试
启动 cloud-eureka-server7001、rabbitmq 服务和 cloud-stream-rabbitmq-consumer8801 模块,在浏览器输入 http://localhost:8801/sendMessage 进行测试,多刷新几次浏览器。如下图:
可以看到,消息成功的被 rabbitmq 接收了。
四、消息驱动之消费者
4.1 工程创建
新建一个 cloud-stream-rabbitmq-consumer8802 模块用来充当消息的消费者,pom.xml 内容如下所示,只有一个依赖比较特殊。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.springcloud</groupId><artifactId>SpringCloud</artifactId><version>1.0-SNAPSHOT</version></parent><artifactId>cloud-stream-rabbitmq-consumer8802</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><!--引入stream整合rabbit的依赖,如果是kafka,则也引入对于的依赖即可--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!--基础配置--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies></project>
application.yml 内容如下所示,比较特殊的是引入了 stream 的相关配置,binders 标签指定继承了哪种消息中间件并配置其连接地址、用户名和密码等。bindings 标签表示要对哪些服务进行整合,input 表示这是一个消息的消费者,destination 表示的是 rabbitmq 里面的交换机名称,binder 绑定的是上面指定的消息中间件。
server:port: 8802spring:application:name: cloud-stream-consumercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置eureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)instance-id: receive-8802.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址
主启动类的代码如下:
@SpringBootApplication
public class StreamMQMain8802
{public static void main(String[] args){SpringApplication.run(StreamMQMain8802.class,args);}
}
创建 listenr 类用于监听和接收消息,代码如下:
@Component
// 第一步:可以理解为是一个消息的发送管道的定义,消息的生产者用 source,消息的消费者用 sink
@EnableBinding(Sink.class)
public class ReceiveMessageListener
{@Value("${server.port}")private String serverPort;// 第二步:使用 @StreamListener 注解进行消息的监听和接收@StreamListener(Sink.INPUT)public void input(Message<String> message){System.out.println("消费者1号,------->接收到的消息:" + message.getPayload()+"\t port: "+serverPort);}
}
4.2 测试
接下来测试使用 8801 发送消息,看 8802 是否可以正常的接收消息,启动 cloud-eureka-server7001、rabbitmq 服务和 cloud-stream-rabbitmq-consumer8801 和 cloud-stream-rabbitmq-consumer8802 模块, 在浏览器输入 http://localhost:8801/sendMessage 进行测试,
可以看到 8081 发送的消息可以被 8082 正常的消费,如下图:
五、分组消费与持久化
5.1 工程创建
参照 cloud-stream-rabbitmq-consumer8802 模块,我们重新创建一个消费者模块 cloud-stream-rabbitmq-consumer8803,可能有点不一样的就是 application.yml,我把它的内容粘出来,如下:
server:port: 8803spring:application:name: cloud-stream-consumercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置eureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)instance-id: receive-8803.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址
5.2 测试
启动 cloud-eureka-server7001、rabbitmq 服务和 cloud-stream-rabbitmq-consumer8801、cloud-stream-rabbitmq-consumer8802 和 cloud-stream-rabbitmq-consumer8803 模块, 在浏览器输入 http://localhost:8801/sendMessage 进行测试,
可以看到 8081 发送的消息可以既被 8082 又被 8083 消费了,如下图:
5.3 重复消费问题
目前是生产者发送消息,8802 和 8803 同时都收到了,存在重复消费问题。
比如在如下场景中,订单系统我们做集群部署,都会从 RabbitMQ 中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用 Stream 中的消息分组来解决。
注意在 Stream 中处于同一个 group 中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费),同一组内会发生竞争关系,只有其中一个可以消费。
5.4 分组
5.4.1 原理
微服务应用放置于同一个 group 中,就能够保证消息只会被其中一个应用消费一次。不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。
5.4.2 不同组测试
我们先把 8802 和 8803 都变成不同组进行测试,首先修改 8802 的 application.yml,添加一个 group 属性,如下图:
修改 8803 的 application.yml,添加一个 group 属性,如下图:
启动 cloud-eureka-server7001、rabbitmq 服务和 cloud-stream-rabbitmq-consumer8801、cloud-stream-rabbitmq-consumer8802 和 cloud-stream-rabbitmq-consumer8803 模块, 在浏览器输入 http://localhost:8801/sendMessage 进行测试,
可以得出结论:不同组是可以全面消费的(重复消费)。
5.4.3 同组测试
使用 8802 和 8803 进行同组进行测试,在 8802 和 8803 的 application.yml 中设置相同的 group 属性,都属于 tansunA 组,如下图:
启动 cloud-eureka-server7001、rabbitmq 服务和 cloud-stream-rabbitmq-consumer8801、cloud-stream-rabbitmq-consumer8802 和 cloud-stream-rabbitmq-consumer8803 模块, 在浏览器输入 http://localhost:8801/sendMessage 进行测试,
可以得出结论:同一个组的多个微服务实例,每次只会有一个拿到。
5.5 持久化
停止 8802 和 8803 ,并将 8802 的 group 属性去除掉,然后让 8801 先发送 4 条消息到 rabbitmq。
先启动 8802,无分组属性配置,后台没有打出来消息,如下图:
再启动 8803,有分组属性配置,后台打出来了 MQ 上的消息
可以得出结论:只要你有分组的属性,你的数据就不会丢失。