使用 idea 创建 Springboot 项目
添加 Spring cloud stream 和 rabbitmq 依赖
pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.0</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>springcloudstream-demo1</artifactId><version>0.0.1-SNAPSHOT</version><name>springcloudstream-demo1</name><description>springcloudstream-demo1</description><properties><java.version>17</java.version><spring-cloud.version>2023.0.0-RC1</spring-cloud.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-web</artifactId>-->
<!-- </dependency>--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-rabbit</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-test-binder</artifactId><scope>test</scope></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build><repositories><repository><id>spring-milestones</id><name>Spring Milestones</name><url>https://repo.spring.io/milestone</url><snapshots><enabled>false</enabled></snapshots></repository></repositories></project>
修改配置文件
--- # rabbitmq 消费者配置
spring:rabbitmq:host: localhostport: 5672username: ruoyipassword: ruoyi123cloud:stream:rabbit:bindings:demo2-in-0:consumer:delayed-exchange: truebindings:demo-in-0:content-type: application/jsondestination: demo-destinationgroup: demo-groupbinder: rabbitdemo1-in-0:content-type: application/jsondestination: demo1-destinationgroup: demo1-groupbinder: rabbitdemo2-in-0:content-type: application/jsondestination: demo2-destinationgroup: demo2-groupbinder: rabbitfunction:definition: demo;demo1;demo2
创建消费者
package com.example.springcloudstreamdemo1;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;import java.util.function.Consumer;@SpringBootApplication
public class SpringcloudstreamDemo1Application {public static void main(String[] args) {SpringApplication.run(SpringcloudstreamDemo1Application.class, args);}/*** 注意方法名称 demo 要与配置文件中的spring.cloud.stream.bindings.demo-in-0 保持一致* 其中 -in-0 是固定写法,in 标识消费者类型,0是消费者索引*/@Beanpublic Consumer<Person> demo() {return person -> {System.out.println("Received: " + person);};}@Beanpublic Consumer<String> demo1() {return msg -> {System.out.println("Received: " + msg);};}@Beanpublic Consumer<Person> demo2() {return msg -> {System.out.println("Received: " + msg);};}public static class Person {private String name;public String getName() {return name;}public void setName(String name) {this.name = name;}public String toString() {return this.name;}}
}
注意当多个消费者时,需要添加配置项:spring.cloud.function.definition
启动项目
启动日志
查看 mq 控制台
交换机信息
交换机名称对应:
spring.cloud.stream.bindings.demo-in-0.destination配置项的值
队列信息
- 队列名称是交换机名称+分组名
添加生产者配置
--- #生产者配置
spring:cloud:stream:rabbit:bindings:demo2-out-0:producer:delayedExchange: true #设置为延迟队列bindings:demo-out-0:content-type: application/jsondestination: demo-destination # 同消费者保持一致binder: rabbitdemo1-out-0:content-type: application/jsondestination: demo1-destinationbinder: rabbitdemo2-out-0:content-type: application/jsondestination: demo2-destinationbinder: rabbit
创建消息生产者
package com.example.springcloudstreamdemo1;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class ProducerController
{@Autowiredprivate StreamBridge streamBridge;@GetMapping("sendMsg")public String sendMsg(int delay, String name){//delay 延时时间毫秒SpringcloudstreamDemo1Application.Person person = new SpringcloudstreamDemo1Application.Person();person.setName(name);Message<SpringcloudstreamDemo1Application.Person> message = MessageBuilder.withPayload(person).setHeader("x-delay", delay).build();// 发送延时消息streamBridge.send("demo2-out-0", message);streamBridge.send("demo1-out-0", person);streamBridge.send("demo-out-0", person);return "发送成功";}
}
启动测试
发送消息
http://localhost:8080/sendMsg?delay=10000&name=zhangsan
打印消息
问题总结
问题一
Multiple functional beans were found [*,*], thus can't determine default function definition. Please use 'spring.cloud.function.definition' property to explicitly define it.
解决办法:
查看配置是否正确:
spring.cloud.function.definition