🥚今日鸡汤🥚
见过一些人,他们朝九晚五😭,有时也要加班,却能把生活过得很😎有趣。他们有自己的爱好,不怕独处。他们有自己的坚持,哪怕没人在乎。🤦♂️
开心一点😁
认真一点🤔
努力一点🫡
目录
😶🌫️1.为什么引入Stream
🥚2.什么是Stream
🧇3.Steam设计思想
🥓4.案例说明
🧂5.重复消费
1.为什么引入Stream🥚🥚🥚
- 屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型
1.1无感知的使用消息中间件
Stream解决了开发人员无感知的使用消息中间件的问题,因为Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知。
1.2中间件和服务的高度解耦
Spring Cloud Stream进行了配置隔离,只需要调整配置,开发中可以动态的切换中间件(如rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。
2.什么是Stream 🥚🥚🥚
- 官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。
- 应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。
- 通过我们配置来binding(绑定),而Spring Cloud Stream的 binder对象负责与消息中间件交互。
所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。
3.Steam设计思想🥚🥚🥚
- 通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。
- 通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
- 通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
4.案例说明 🥚🥚🥚
- cloud-stream-rabbitmq-provider8801,作为生产者进行发消息模块
- cloud-stream-rabbitmq-consumer8802,作为消息接收模块
- cloud-stream-rabbitmq-consumer8803,作为消息接收模块
4.1消息驱动-生产者
1.加pom
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot </groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!--基础依赖--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--eureka客户端--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><!--消息驱动--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency></dependencies>
2.改yml
- 注意:小张的Rabbitmq是在Linux上的,所以配置如下:
server:port: 8801spring:application:name: cloud-stream-providerrabbitmq:host: 192.168.20.129port: 5672username: rootpassword: 123456cloud:stream:binders:defaultRabbit:type: rabbitbindings:output:destination: studyExchangecontent-type: application/jsonbinder: defaultRabbiteureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka,http://eureka7003.com:7003/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)instance-id: send-8801.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址
3.主启动类
@SpringBootApplication
public class StreamMqMain8801 {public static void main(String[] args) {SpringApplication.run(StreamMqMain8801.class);}
}
4.业务类
- 1.创建接口
- 2.创建接口实现类
- @EnableBinding:Spring Cloud Stream中用来启用消息传递功能的注释。
- 它用于将应用程序绑定到消息传递系统(例如,Apache Kafka, RabbitMQ),并声明用于发送和接收消息的输入和输出通道。
- 通过使用@EnableBinding,您可以定义应用程序所需的通道和消息处理程序
@EnableBinding(Source.class)//定义消息的推送管道
public class IMessageProviderImpl implements IMessageProvider {@Autowiredprivate MessageChannel output; //消息发送管道@Overridepublic String send() {String serial= UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(serial).build());System.out.println("======serial:"+serial);return null;}
}
5.测试
- 1.浏览器192.168.20.129:15672访问RabbitMQ
- 2.localhost:8801/sendMessage访问
4.2消息驱动-消费者
1.建模块
- 1.在父工程下创建模块cloud-stream-rabbitmq-consumer8802,作为消息接收模块
- 2.注意jdk和maven版本号
2.加pom
- 1.springboot依赖
- 2.通用依赖
- 3.eureka客户端依赖
- 4.消息驱动rabbitmq
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot </groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!--基础依赖--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--eureka客户端--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><!--消息驱动--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency></dependencies>
3.添yml
server:port: 8802spring:application:name: cloud-stream-consumerrabbitmq:host: 192.168.20.129port: 5672username: rootpassword: 123456cloud:stream:binders:defaultRabbit:type: rabbitbindings:input:destination: studyExchangecontent-type: application/jsonbinder: defaultRabbiteureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka,http://eureka7003.com:7003/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)instance-id: receive-8802.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址
4.主启动类
@SpringBootApplication
public class StreamMqMain8802 {public static void main(String[] args) {SpringApplication.run(StreamMqMain8802.class);}
}
5.业务类
- 1.@StreamListener注解是Spring Cloud Stream框架提供的一个注解,用于定义一个消息监听器。
- 2.通过使用@StreamListener注解,可以将一个方法标记为消息的消费者,并指定该方法要监听的消息通道。
- 3.当有消息到达指定的通道时,该方法会被自动触发执行,从而处理这个消息。
- 4.@StreamListener注解通常与@EnableBinding注解一起使用,用于指定所要绑定的消息通道。
- 5.@EnableBinding注解用于绑定消息通道与应用程序中的输入输出接口,@StreamListener注解则用于标记一个方法作为消息的消费者。
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message<String> message){System.out.println("消费者1---接受消息:"+message.getPayload()+",port:"+serverPort);}}
6.测试
- 1.使用8801生产者发送消息
- 2.使用8802消费者接受消息
5.重复消费 🥚🥚🥚
问题描述:
- 1.根据8802,重新创建cloud-stream-rabbitmq-consumer8803,作为消息接收模块
- 2.8801生产者发送消息
- 3.8802,8803都可以接收到
如果一个订单同时被两个服务获取到,就会造成数据错误
注意:在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费)。
5.1自定义分组
在消费者端添加group配置:分为xzA,xzB
5.2轮询分组
8802/8803实现了轮询分组,每次只有一个消费者8801模块的发的消息只能被8802或8803其中一个接收到,这样避免了重复消费。
8802,8803的group配置相同名称,重新启动 ,使用8801发送两条消息,8802接受一条,8803接收一条