一、介绍
(1)提供统一接口操作不同厂商的消息队列组件,降低学习成本
(2)生产者和消费者只需操作binder对象即可与消息队列交互,生产者output,消费者input
(3)核心概念:发布订阅、消费组、分区
(4)使用topic模式
二、项目搭建
(1)生产者
a、编写pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>demo20220821</artifactId><groupId>com.wsh.springcloud</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>cloud-stream-rabbitmq-provider8801</artifactId><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>com.wsh.springcloud</groupId><artifactId>cloud-api-common</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies></project>
b、编写application.yml
server:port: 8801spring:application:name: cloud-stream-rabbit-providercloud:stream:binders:defaultRabbit:type: rabbitenvironment:spring:rabbitmq:host: 192.168.0.166port: 5672username: guestpassword: guestbindings:output:destination: testExchangecontent-type: application/jsonbinder: defaultRabbiteureka:client:# 客户端设置为trueregister-with-eureka: true# 客户端设置为truefetch-registry: trueservice-url:# defaultZone: http://localhost:7001/eurekadefaultZone: http://eureka1.com:7001/eureka, http://eureka2.com:7002/eurekainstance:instance-id: cloudSreamRabbitProvider8801prefer-ip-address: truemanagement:endpoints:web:exposure:include: "*"
c、编写启动类
package com.wsh.springcloud;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;/*** @ClassName ConfigMain3344* @Description: TODO* @Author wshaha* @Date 2023/10/15* @Version V1.0**/
@SpringBootApplication
@EnableEurekaClient
public class StreamRabbitMqProvider8801 {public static void main(String[] args) {SpringApplication.run(StreamRabbitMqProvider8801.class, args);}
}
d、编写接口及实现类
package com.wsh.springcloud.service;public interface IMessageProvider {public String send();
}
package com.wsh.springcloud.service;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;/*** @ClassName MessageProviderImpl* @Description: TODO* @Author wshaha* @Date 2023/10/15* @Version V1.0**/
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {@Autowired@Qualifier("output")private MessageChannel messageChannel;@Overridepublic String send() {messageChannel.send(MessageBuilder.withPayload("hello").build());return null;}
}
e、编写Controller
package com.wsh.springcloud.controller;import com.wsh.springcloud.service.IMessageProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;/*** @ClassName MessageController* @Description: TODO* @Author wshaha* @Date 2023/10/15* @Version V1.0**/
@RestController
public class MessageController {@Autowiredprivate IMessageProvider messageProvider;@GetMapping("/sendMessage")public void sendMessage(){messageProvider.send();}
}
(2)编写消费者
a、编写pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>demo20220821</artifactId><groupId>com.wsh.springcloud</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>cloud-stream-rabbitmq-consumer8802</artifactId><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>com.wsh.springcloud</groupId><artifactId>cloud-api-common</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
</project>
b、编写application.yml
server:port: 8802spring:application:name: cloud-stream-rabbit-providercloud:stream:binders:defaultRabbit:type: rabbitenvironment:spring:rabbitmq:host: 192.168.0.166port: 5672username: guestpassword: guestbindings:input:destination: testExchangecontent-type: application/jsonbinder: defaultRabbiteureka:client:# 客户端设置为trueregister-with-eureka: true# 客户端设置为truefetch-registry: trueservice-url:# defaultZone: http://localhost:7001/eurekadefaultZone: http://eureka1.com:7001/eureka, http://eureka2.com:7002/eurekainstance:instance-id: cloudSreamRabbitProvider8801prefer-ip-address: truemanagement:endpoints:web:exposure:include: "*"
c、编写启动类
package com.wsh.springcloud;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;/*** @ClassName ConfigMain3344* @Description: TODO* @Author wshaha* @Date 2023/10/15* @Version V1.0**/
@SpringBootApplication
@EnableEurekaClient
public class StreamRabbitMqConsumer8802 {public static void main(String[] args) {SpringApplication.run(StreamRabbitMqConsumer8802.class, args);}
}
d、编写Controller
package com.wsh.springcloud.controller;import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.RestController;/*** @ClassName ConsumerController* @Description: TODO* @Author wshaha* @Date 2023/10/15* @Version V1.0**/
@RestController
@EnableBinding(Sink.class)
public class ConsumerController {@StreamListener(Sink.INPUT)public void receiveMessage(Message<String> message){System.out.println(message.getPayload());}
}
(3)运行
三、解决消息重复消费
(1)绑定同一交换机且不同组的消费者会收到相同消息
(2)解决方式,同一组的消费者只有一个消费者会收到消息,故配置这群消费者为同一个组即可
(3)配置
四、消息持久化
(1)定义分组后会实现消息持久化,原理:没定义分组时,服务对应的队列是autodelete,服务停止后就删除队列,手续发送的消息无法收到