文章目录
- 概述
- 标准MQ
- 配置
- POM
- YML
- 示例
- 消息发送
- 配置RabbitMQ可视化插件
- 消息消费者
- 遇到的问题
- 复现
- 解决:修改YML
- 注意
概述
屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型
官网:
https://spring.io/projects/spring-cloud-stream#overview
https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/
中文:
https://m.wang1314.com/doc/webapp/topic/20971999.html
通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。
通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
标准MQ
配置
POM
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
YML
server:port: 8801spring:application:name: cloud-stream-providercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置eureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)instance-id: send-8801.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址
示例
消息发送
- 接口
public interface IMessageProvider
{public String send() ;
}
- 实现类
@EnableBinding(Source.class) // 可以理解为是一个消息的发送管道的定义
public class MessageProviderImpl implements IMessageProvider
{@Resourceprivate MessageChannel output; // 消息的发送管道@Overridepublic String send(){String serial = UUID.randomUUID().toString();this.output.send(MessageBuilder.withPayload(serial).build()); // 创建并发送消息System.out.println("***serial: "+serial);return serial;}
}
- Controller
@RestController
public class SendMessageController
{@Resourceprivate IMessageProvider messageProvider;@GetMapping(value = "/sendMessage")public String sendMessage(){return messageProvider.send();}
}
配置RabbitMQ可视化插件
rabbitmq-plugins enable rabbitmq_management
http://localhost:15672/
消息消费者
- POM
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--基础配置-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional>
</dependency>
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope>
</dependency>
- Controller
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController
{@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message<String> message){System.out.println("消费者1号,----->接受到的消息: "+message.getPayload()+"\t port: "+serverPort);}
}
遇到的问题
- 有重复消费问题
- 消息持久化问题
复现
比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,
那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。
这时我们就可以使用Stream中的消息分组来解决
注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。
不同组是可以全面消费的(重复消费),
同一组内会发生竞争关系,只有其中一个可以消费。
解决:修改YML
添加group
spring:application:name: cloud-stream-consumercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置group: atguiguA
同一个消费组的多个微服务实例,每次只会有一个拿到
注意
没有分到消费组中,不会持久化,会丢失未曾消费消息