RabbitMQ 学习笔记

RabbitMQ学习笔记

 

一些概念

Broker :RabbitMQ服务。

virtual host: 其实就是分组。

Connection:连接,生产者消费者与Broker之间的TCP连接。

Channel:网络信道,轻量级的Connection,使用Channel可以减少Connection的建立,减少开销。

Message:消息,由 PropertiesBody组成,Properties可以对消息的优先级、延迟等特性进行记录,Body存储消息体的内容。

Exchange:交换机,没有消息存储功能,负责分发消息。

BindingExchangeQueue之间的虚拟连接,其中可以包含Routing Key

Routing Key:路由规则,用于确定如何分发、接收消息。

Queue:消息队列,保存消息并将其转发给消费者进行消费。

安装

Windows安装

安装erLang语言

进入官网

image-20220723085850289

 

 

下载完之后一直下一步安装即可,安装完成后进入目录,配置环境变量

image-20220723092150573

image-20220723092301127

安装RabbitMQ服务端

Release RabbitMQ 3.7.3 · rabbitmq/rabbitmq-server (github.com)

image-20220723091828280

一直下一步安装即可

安装完成后打开安装目录,进入到这个文件夹打开命令行

image-20220723093324568

输入命令安装插件

rabbitmq-plugins enable rabbitmq_management

完成后双击rabbitmq-server.bat

打开http://localhost:15672/

用户名密码是guest/guest

image-20220723093515104

image-20220723093550183

Linux下使用 Docker 安装

直接拉取最新版

docker pull rabbitmq

运行容器

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

进入容器

docker exec -it rabbitmq /bin/bash

开启管理插件

rabbitmq-plugins enable rabbitmq_management

image-20220723103556298

打开管理网站 http://localhost:15672/

4369, 25672 (Erlang发现&集群端口)

5672, 5671 (AMQP端口)

15672 (web管理后台端口)

61613, 61614 (STOMP协议端口)

1883, 8883 (MQTT协议端口)

用户名密码均为 guest

image-20220723103a414689

实操

官网例子

简单模式

11111

配置文件 application-easy.yml

spring:rabbitmq:host: 123.123.123.123port: 5672username: Gettlerpassword: ********virtual-host: /queue: easy-queue

生产者:

package com.gettler.rabbitmq.easy;import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;@ActiveProfiles("easy")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ProducerTest {// 队列名称@Value("${spring.rabbitmq.queue}")public String QUEUE_NAME;private static final Logger logger = LoggerFactory.getLogger(ProducerTest.class);@Testpublic void testProducer() throws Exception {// 创建一个connectionConnection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel = connection.createChannel();/*创建一个队列1.队列名称2.队列里面的消息是否持久化(默认为false,代表消息存储在内存中)3.该队列是否只供一个消费者进行消费,是否进行共享(true表示可以多个消费者消费)4.表示最后一个消费者断开连接以后,该队列是否自动删除(true表示自动删除)5.其他参数*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "hello, this is an easy message";/*发送一个消息1.发送到那个交换机(空代表默认交换机)2.路由key3.其他的参数信息4.发送消息的消息体*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());logger.info("消息发送完毕");}
}

消费者:

package com.gettler.rabbitmq.easy;import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;@ActiveProfiles("easy")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ConsumerTest {// 队列名称@Value("${spring.rabbitmq.queue}")public String QUEUE_NAME;private static final Logger logger = LoggerFactory.getLogger(ConsumerTest.class);@Testpublic void testConsumer() throws Exception {// 创建一个connectionConnection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel = connection.createChannel();// 消费消息的回调DeliverCallback deliverCallback = (consumerTag, message) -> {logger.info("消费消息成功,消息内容为:" + new String(message.getBody()));};// 取消消费的回调CancelCallback cancelCallback = (consumerTag) -> {logger.info("消息消费被中断");};/*消费者消费消息1.消费的队列名称2.消费成功之后是否要自动应答(true代表自动应答,false代表手动应答)3.消费者消费消息的回调(函数式接口)4.消费者取消消费的回调(函数式接口)*/channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}
}

image-20240620161232526

工作模式

在这里插入图片描述

配置文件 application-work.yml

spring:rabbitmq:host: 123.123.123.123port: 5672username: Gettlerpassword: ********virtual-host: /queue: work-queue

生产者:

package com.gettler.rabbitmq.work;import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;import java.util.Scanner;@ActiveProfiles("work")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ProducerTest {// 队列名称@Value("${spring.rabbitmq.queue}")public String QUEUE_NAME;private static final Logger logger = LoggerFactory.getLogger(ProducerTest.class);@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Testpublic void testProducer() throws Exception {System.out.println(this.host);// 创建一个connectionConnection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel = connection.createChannel();// 声明交换机channel.exchangeDeclare("fanout", BuiltinExchangeType.FANOUT);/*创建一个队列1.队列名称2.队列里面的消息是否持久化(默认为false,代表消息存储在内存中)3.该队列是否只供一个消费者进行消费,是否进行共享(true表示可以多个消费者消费)4.表示最后一个消费者断开连接以后,该队列是否自动删除(true表示自动删除)5.其他参数*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = scanner.next();/*发送一个消息1.发送到那个交换机(空代表默认交换机)2.路由key3.其他的参数信息4.发送消息的消息体*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());logger.info("消息发送完毕");}}
}

消费者A:

package com.gettler.rabbitmq.work;import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;@ActiveProfiles("work")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ConsumerATest {// 队列名称@Value("${spring.rabbitmq.queue}")public String QUEUE_NAME;private static final Logger logger = LoggerFactory.getLogger(ConsumerATest.class);@Testpublic void testConsumerA() throws Exception {// 创建一个connectionConnection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel = connection.createChannel();// 消费消息的回调DeliverCallback deliverCallback = (consumerTag, message) -> {logger.info("消费消息成功,消息内容为:" + new String(message.getBody()));};// 取消消费的回调CancelCallback cancelCallback = (consumerTag) -> {logger.info("消息消费被中断");};/*消费者消费消息1.消费的队列名称2.消费成功之后是否要自动应答(true代表自动应答,false代表手动应答)3.消费者消费消息的回调(函数式接口)4.消费者取消消费的回调(函数式接口)*/channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}
}

消费者B:

package com.gettler.rabbitmq.work;import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;@ActiveProfiles("work")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ConsumerBTest {// 队列名称@Value("${spring.rabbitmq.queue}")public String QUEUE_NAME;private static final Logger logger = LoggerFactory.getLogger(ConsumerBTest.class);@Testpublic void testConsumerB() throws Exception {// 创建一个connectionConnection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel = connection.createChannel();// 消费消息的回调DeliverCallback deliverCallback = (consumerTag, message) -> {logger.info("消费消息成功,消息内容为:" + new String(message.getBody()));};// 取消消费的回调CancelCallback cancelCallback = (consumerTag) -> {logger.info("消息消费被中断");};/*消费者消费消息1.消费的队列名称2.消费成功之后是否要自动应答(true代表自动应答,false代表手动应答)3.消费者消费消息的回调(函数式接口)4.消费者取消消费的回调(函数式接口)*/channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}
}

image-20240620161656576

路由模式

配置文件 application-direct.yml

spring:rabbitmq:host: 123.123.123.123port: 5672username: Gettlerpassword: ********virtual-host: /

生产者:

package com.gettler.rabbitmq.direct;import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;import java.util.HashMap;
import java.util.Map;@ActiveProfiles("direct")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ProducerTest {private static final Logger logger = LoggerFactory.getLogger(ProducerTest.class);@Testpublic void testProducer() throws Exception {// 创建channelConnection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();Channel channel = connection.createChannel();// 声明交换机channel.exchangeDeclare("direct", BuiltinExchangeType.DIRECT);Map<String, String> messageMap = new HashMap<>();messageMap.put("info", "普通 info 信息");messageMap.put("warning", "警告 warning 信息");messageMap.put("error", "错误 error 信息");messageMap.put("debug", "调试 debug 信息");for (Map.Entry<String, String> mes : messageMap.entrySet()) {String routingKey = mes.getKey();String message = mes.getValue();channel.basicPublish("direct", routingKey, null, message.getBytes());logger.info("消息发送完毕");}}
}

消费者A:

package com.gettler.rabbitmq.direct;import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;@ActiveProfiles("direct")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ConsumerATest {private static final Logger logger = LoggerFactory.getLogger(ConsumerATest.class);@Testpublic void testConsumerA() throws Exception {// 创建一个connectionConnection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel = connection.createChannel();// 创建channel// 声明交换机channel.exchangeDeclare("direct", BuiltinExchangeType.DIRECT);// 声明临时队列channel.queueDeclare("console", false, false, false, null);// 绑定队列与交换机channel.queueBind("console", "direct", "info");channel.queueBind("console", "direct", "warning");// 消费消息DeliverCallback deliverCallback = (consumerTag, message) -> {logger.info("获得消息:" + new String(message.getBody()));};CancelCallback cancelCallback = (consumerTag) -> {logger.info("消息消费被中断");};channel.basicConsume("console", true, deliverCallback, cancelCallback);}
}

消费者B:

package com.gettler.rabbitmq.direct;import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;@ActiveProfiles("direct")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ConsumerBTest {private static final Logger logger = LoggerFactory.getLogger(ConsumerBTest.class);@Testpublic void testConsumerB() throws Exception {// 创建一个connectionConnection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel = connection.createChannel();// 声明交换机channel.exchangeDeclare("direct", BuiltinExchangeType.DIRECT);// 声明临时队列channel.queueDeclare("disk", false, false, false, null);// 绑定队列与交换机channel.queueBind("disk", "direct", "error");// 消费消息DeliverCallback deliverCallback = (consumerTag, message) -> {logger.info("获得消息:" + new String(message.getBody()));};CancelCallback cancelCallback = (consumerTag) -> {logger.info("消息消费被中断");};channel.basicConsume("disk", true, deliverCallback, cancelCallback);}
}

image-20240620161838310

广播模式

配置文件 application-fanout.yml

spring:rabbitmq:host: 123.123.123.123port: 5672username: Gettlerpassword: ********virtual-host: /

生产者:

package com.gettler.rabbitmq.fanout;import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;@ActiveProfiles("fanout")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ProducerTest {private static final Logger logger = LoggerFactory.getLogger(ProducerTest.class);@Testpublic void testProducer() throws Exception {// 创建channelConnection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();Channel channel = connection.createChannel();// 声明交换机channel.exchangeDeclare("fanout", BuiltinExchangeType.FANOUT);// 发送10条消息for (int i = 0; i < 10; i++) {String message = i + "";channel.basicPublish("fanout", "", null, message.getBytes());logger.info("消息发送完毕" + message);}}
}

消费者A:

package com.gettler.rabbitmq.fanout;import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;@ActiveProfiles("fanout")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ConsumerATest {private static final Logger logger = LoggerFactory.getLogger(ConsumerATest.class);@Testpublic void testConsumerA() throws Exception {// 创建一个connectionConnection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel = connection.createChannel();// 声明交换机channel.exchangeDeclare("fanout", BuiltinExchangeType.FANOUT);// 声明临时队列String queueName = channel.queueDeclare().getQueue();// 绑定队列与交换机channel.queueBind(queueName, "fanout", "");// 消费消息DeliverCallback deliverCallback = (consumerTag, message) -> {logger.info("获得消息:" + new String(message.getBody()));};CancelCallback cancelCallback = (consumerTag) -> {logger.info("消息消费被中断");};channel.basicConsume(queueName, true, deliverCallback, cancelCallback);}
}

消费者B:

package com.gettler.rabbitmq.fanout;import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;@ActiveProfiles("fanout")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ConsumerBTest {private static final Logger logger = LoggerFactory.getLogger(ConsumerBTest.class);@Testpublic void testConsumerB() throws Exception {// 创建一个connectionConnection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel = connection.createChannel();// 声明交换机channel.exchangeDeclare("fanout", BuiltinExchangeType.FANOUT);// 声明临时队列String queueName = channel.queueDeclare().getQueue();// 绑定队列与交换机channel.queueBind(queueName, "fanout", "");// 消费消息DeliverCallback deliverCallback = (consumerTag, message) -> {logger.info("获得消息:" + new String(message.getBody()));};CancelCallback cancelCallback = (consumerTag) -> {logger.info("消息消费被中断");};channel.basicConsume(queueName, true, deliverCallback, cancelCallback);}
}

image-20240620162526952

主题模式

配置文件 application-topic.yml

spring:rabbitmq:host: 123.123.123.123port: 5672username: Gettlerpassword: ********virtual-host: /

生产者:

package com.gettler.rabbitmq.topic;import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;import java.util.HashMap;
import java.util.Map;/*** @author Gettler* @date 2024/06/13*/
@ActiveProfiles("topic")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ProducerTest {private static final Logger logger = LoggerFactory.getLogger(ProducerTest.class);@Testpublic void testProducer() throws Exception {// 创建channelConnection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();Channel channel = connection.createChannel();// 声明交换机channel.exchangeDeclare("topic", BuiltinExchangeType.TOPIC);Map<String, String> messageMap = new HashMap<>();messageMap.put("class1.DB.exam", "一班数据库考试通知");messageMap.put("class1.OS.exam", "一班操作系统考试通知");messageMap.put("class2.DB.exam", "二班数据库考试通知");messageMap.put("class2.OS.exam", "二班操作系统考试通知");for (Map.Entry<String, String> mes : messageMap.entrySet()) {String routingKey = mes.getKey();String message = mes.getValue();channel.basicPublish("topic", routingKey, null, message.getBytes());logger.info("消息发送完毕");}}
}

消费者A(模拟一班的学生):

package com.gettler.rabbitmq.topic;import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;/*** @author Gettler* @date 2024/06/13*/
@ActiveProfiles("topic")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)
public class StudentOfClass1Consumer {private static final Logger logger = LoggerFactory.getLogger(StudentOfClass1Consumer.class);@Testpublic void testStudentOfClass1Consumer() throws Exception {// 创建一个connectionConnection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel = connection.createChannel();// 声明交换机channel.exchangeDeclare("topic", BuiltinExchangeType.TOPIC);// 创建Q1队列channel.queueDeclare("student_of_class1", false, false, false, null);// 绑定队列与交换机channel.queueBind("student_of_class1", "topic", "class1.#");// 消费消息DeliverCallback deliverCallback = (consumerTag, message) -> {logger.info("获得消息:" + new String(message.getBody()));};CancelCallback cancelCallback = (consumerTag) -> {logger.info("消息消费被中断");};channel.basicConsume("student_of_class1", true, deliverCallback, cancelCallback);}
}

消费者B(模拟操作系统老师):

package com.gettler.rabbitmq.topic;import com.gettler.rabbitmq.RabbitmqApplication;
import com.gettler.rabbitmq.config.RabbitMqConnectionFactory;
import com.rabbitmq.client.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;/*** @author Gettler* @date 2024/06/13*/
@ActiveProfiles("topic")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class, webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)
public class TeacherConsumer {private static final Logger logger = LoggerFactory.getLogger(TeacherConsumer.class);@Testpublic void testTeacherConsumer() throws Exception {// 创建一个connectionConnection connection = RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel = connection.createChannel();// 声明交换机channel.exchangeDeclare("topic", BuiltinExchangeType.TOPIC);// 创建Q1队列channel.queueDeclare("teacher_of_OS", false, false, false, null);// 绑定队列与交换机channel.queueBind("teacher_of_OS", "topic", "#.OS.#");// 消费消息DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("获得消息:" + new String(message.getBody()));};CancelCallback cancelCallback = (consumerTag) -> {System.out.println("消息消费被中断");};channel.basicConsume("teacher_of_OS", true, deliverCallback, cancelCallback);}
}

image-20240620162754734

谷粒商城 RabbitMQ 学习笔记

新建Maven项目

添加依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.0.0</version>
</dependency>

编写发送端

package org.example;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  public class Send  
{  //队列名称  private final static String QUEUE_NAME = "helloMQ";  public static void main(String[] argv) throws java.io.IOException, TimeoutException  {  /** * 创建连接连接到MabbitMQ */  ConnectionFactory factory = new ConnectionFactory();  //设置MabbitMQ所在主机ip或者主机名  factory.setHost("localhost");  //创建一个连接  Connection connection = factory.newConnection();  //创建一个频道  Channel channel = connection.createChannel();  //指定一个队列  channel.queueDeclare(QUEUE_NAME, false, false, false, null);  //发送的消息  String message = "hello world!";  //往队列中发出一条消息  channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  System.out.println(" [x] Sent '" + message + "'");  //关闭频道和连接  channel.close();  connection.close();  }  
}  

编写接收端

package org.example;import com.rabbitmq.client.*;import java.io.IOException;public class Recv {// 队列名称private final static String QUEUE_NAME = "helloMQ";public static void main(String[] argv) throws Exception {// 打开连接和创建频道,与发送端一样ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");//创建消费者Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");}};channel.basicConsume(QUEUE_NAME, true, consumer);}
}

运行接收端

image-20220723101156639

运行发送端,每运行一次发送一次消息

image-20220723101246973

管理网站上有接收端的连接(发送端发送后便断开连接了)

image-20220723101256826

添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
编写配置文件
spring.rabbitmq.host=192.168.3.200
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
创建Exchange
public void createExchange() {DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);amqpAdmin.declareExchange(directExchange);
}
创建Queue
public void createQueue() {Queue queue = new Queue("hello-java-queue", true, false, false);amqpAdmin.declareQueue(queue);
}
连接Queue和Exchange
public void createBinding() {Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE, "hello-java-exchange", "hello.java", null);amqpAdmin.declareBinding(binding);
}
发送消息
public void sendMessage() {String msg = "hello world";List<String> s = new ArrayList<>();s.add(msg);s.add("List");rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", s, new CorrelationData(UUID.randomUUID().toString()));
}
接收消息

想要接受对象消息,需使用JSON序列化机制,进行消息转换

编写MyRabbitConfig配置类

@Configuration
public class MyRabbitConfig {@AutowiredRabbitTemplate rabbitTemplate;/*** 使用JSON序列化机制,进行消息转换* @return*/@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}
}

使用RabbitListener注解监听队列,该注解参数可以是Object content, Message message, Channel channel。

@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Object message) {System.out.println("接受到消息内容:" + message);
}
可靠抵达

编写配置文件

# 开启发送端确认
spring.rabbitmq.publisher-confirm-type=correlated
# 开启发送端消息抵达队列的确认
spring.rabbitmq.publisher-returns=true
# 抵达队列后以异步发送优先回调抵达队列后的回调returnconfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual

将MyRabbitConfig修改为

@Configuration
public class MyRabbitConfig {@AutowiredRabbitTemplate rabbitTemplate;/*** 使用JSON序列化机制,进行消息转换** @return*/@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}@PostConstruct // MyRabbitConfig对象创建完成后执行该方法public void initRabbitTemplate() {rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** 消息抵达节点的话ack就为true* @param correlationData   当前消息的唯一关联数据(消息唯一ID)* @param ack 消息是否成功收到* @param cause 失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("confirming...correlationData{" + correlationData + "},ack{" + ack + "},cause{" + cause + "}");}});rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/*** 没抵达队列,触发这个失败回调函数* @param message* @param replyCode* @param replyText* @param exchange* @param routingKey*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("Unreachable...message{" + message + "},replyCode{" + replyText + "},exchange{" + exchange + "},routingKey{" + routingKey + "}");}});}
}

监听队列方法修改为

@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Message message, List list, Channel channel) throws IOException {System.out.println("接受到消息内容:" + list);// channel内按顺序递增long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println(deliveryTag);// 签收try {channel.basicAck(deliveryTag, false); // 是否批量签收} catch (Exception e) {// 网络中断// b1 = false 丢弃, b1 = true 发回服务器,服务器重新入队。channel.basicNack(deliveryTag, false, false);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/31232.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024广东省职业技能大赛云计算赛项实战——Minio服务搭建

Minio服务搭建 前言 这道题是比赛时考到的&#xff0c;没找到具体题目&#xff0c;但在公布的样题中找到了&#xff0c;虽然很短~ 使用提供的 OpenStack 云平台&#xff0c;申请一台云主机&#xff0c;使用提供的软件包安装部署 MINIO 服务并使用 systemctl 管理 Minio是一个…

HTML静态网页成品作业(HTML+CSS)——手机电子商城网页(4个页面)

&#x1f389;不定期分享源码&#xff0c;关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码 五、源码获取 一、作品介绍 &#x1f3f7;️本套采用HTMLCSS&#xff0c;未使用Javacsript代码&#xff0c;共有4个页面。 二、作品演示 三、代…

Vue 封装组件之Input框

封装Input组件:MyInput.vue <template><div class"base-input-wraper"><el-inputv-bind"$attrs"v-on"$listeners"class"e-input":style"inputStyle":value"value":size"size"input&quo…

深入解析微软Edge浏览器:探索其功能与应用

微软Edge浏览器是微软公司推出的一款现代化网页浏览器,旨在为用户提供快速、安全和高效的上网体验。本文将全面解析微软Edge浏览器,从其历史背景、核心功能、性能表现、安全特性到实际应用场景,带领读者深入了解这款浏览器的优势和使用技巧。 一、Edge浏览器的历史背景 1.…

python API自动化(Pytest+Excel+Allure完整框架集成+yaml入门+大量响应报文处理及加解密、签名处理)

1.pytest数据参数化 假设你需要测试一个登录功能&#xff0c;输入用户名和密码后验证登录结果。可以使用参数化实现多组输入数据的测试: 测试正确的用户名和密码登录成功 测试正确的用户名和错误的密码登录失败 测试错误的用户名和正确的密码登录失败 测试错误的用户名和密码登…

转换普通文件为Spring中的MultipartFile类型:处理不同文件格式

在Web开发中&#xff0c;处理文件上传是一个常见的需求。有时&#xff0c;我们需要将普通的文件对象&#xff08;如.txt或.xlsx文件&#xff09;转换为Spring框架中的MultipartFile类型&#xff0c;以便在Controller中处理文件上传和处理。本文将介绍如何在Java中进行这种转换&…

2024广东省职业技能大赛云计算赛项实战——chkrootkit安装与使用

chkrootkit安装与使用 前言 在今年比赛中有考到这样一道题&#xff0c;也是在公布的样卷中找到了它&#xff0c;今天就做做它并讲解一下&#xff1a; 公有云安全&#xff1a;入侵检测系统 使用提供的 makechk.tar.gz 包安装 chkrootkit 入侵检测工具&#xff0c;安装完毕后使…

音乐界的颠覆与挑战分析

最近的一个月&#xff0c;音乐界掀起了一场科技革命。一系列音乐大模型轮番上线&#xff0c;将素人生产音乐的门槛降到了最低。这一系列科技产品&#xff0c;不仅引发了大众对音乐产业未来发展的热烈讨论&#xff0c;也带来了新的挑战和问题。  一方面&#xff0c;这些AI音乐…

定时器-前端使用定时器3s轮询状态接口,2min为接口超时

背景 众所周知&#xff0c;后端是处理不了复杂的任务的&#xff0c;所以经过人家的技术讨论之后&#xff0c;把业务放在前端来实现。记录一下这次的离大谱需求吧。 如图所示&#xff0c;这个页面有5个列表&#xff0c;默认加载计划列表。但是由于后端的种种原因&#xff0c;这…

PHP XML: 解析、生成与操作指南

PHP XML: 解析、生成与操作指南 PHP,作为一种流行的服务器端脚本语言,提供了强大的功能来处理XML(可扩展标记语言)数据。XML是一种用于存储和传输数据的标记语言,它通过标签来标识数据。在Web开发中,XML常用于数据交换、配置文件和结构化文档。本文将详细介绍如何使用PH…

C++ | Leetcode C++题解之第171题Excel表列序号

题目&#xff1a; 题解&#xff1a; class Solution { public:int titleToNumber(string columnTitle) {int number 0;long multiple 1;for (int i columnTitle.size() - 1; i > 0; i--) {int k columnTitle[i] - A 1;number k * multiple;multiple * 26;}return num…

QT中利用QMovie实现动态加载效果

1、效果 2、代码 #include "widget.h" #include "ui_widget.h" #include <QLabel> #include <QMovie>

【Python Cookbook】S02E15 在文本中处理 HTML 和 XML 实体

目录 问题解决方案讨论 问题 我们如果想要将 HTML 实体以及 XML 实体内容替换成相对应的文本内容&#xff0c;怎么做&#xff1f; 解决方案 s "Elements are written as <tag>txt</tag>."import html print(html.escape(s)) print("-"*20,…

数据分析中如何理解透视表

透视表&#xff08;Pivot Table&#xff09;是数据分析中一种非常强大的工具&#xff0c;用于快速汇总、分析、探索和展示数据。透视表可以根据数据的不同维度和指标进行重组和计算&#xff0c;帮助用户从大量数据中提取有用的信息和发现数据中的模式和趋势。 透视表的基本概念…

在服务器上搭配大模型的运行环境详细版(docker+ollama+langchain等工具)

用到的工具 1.anaconda3环境安装 anaconda3导出环境 #导出环境 conda env export --name your_env_name > custom_environment.yaml #导入环境 conda env create -f environment.yaml2.前置的docker软件安装、docker镜像如何进行转移 sudo apt-get update #时期能访问阿…

在Linux/Ubuntu/Debian中使用SSH连接远程服务器VPS

在Linux/Ubuntu/Debian中使用SSH连接远程服务器VPS 在远程管理服务器时&#xff0c;SSH&#xff08;Secure Shell&#xff09;协议是我们常用的工具之一。它提供了一种加密的方式来访问和管理远程主机。默认情况下&#xff0c;SSH使用22端口&#xff0c;但有时我们需要通过指定…

Redisson-DelayedQueue-原理

归档 GitHub: Redisson-DelayedQueue-原理 Unit-Test RedissonDelayedQueueTest 常规测试 Test public void testCommon() throws InterruptedException {RBlockingQueue<String> destinationQueue redisson.getBlockingQueue("delay_queue"); // 目标队…

YOLOv10训练自己的数据集(图像目标检测)

目录 1、下载代码 2、环境配置 3、准备数据集 4、yolov10训练 可能会出现报错&#xff1a; 1、下载代码 源码地址&#xff1a;https://github.com/THU-MIG/yolov10 2、环境配置 打开源代码&#xff0c;在Terminal中&#xff0c;使用conda 创建虚拟环境配置 命令如下&a…

Python基础教程(二十五):内置函数整理

&#x1f49d;&#x1f49d;&#x1f49d;首先&#xff0c;欢迎各位来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里不仅可以有所收获&#xff0c;同时也能感受到一份轻松欢乐的氛围&#xff0c;祝你生活愉快&#xff01; &#x1f49d;&#x1f49…

Java基础入门day66

day66 内网穿透 NatApp NATAPP-内网穿透 基于ngrok的国内高速内网映射工具 开启您的内网穿透之旅 下载安装后&#xff0c;注册一个账号&#xff0c;可以免费使用内网穿透&#xff0c;不稳定&#xff0c;也可以自行选择收费版本 在下载好app的同级目录&#xff0c;放置一个con…