RabbitMQ学习笔记(消息发布确认,死信队列,集群,交换机,持久化,生产者、消费者)

 MQ(message queue):本质上是个队列,遵循FIFO原则,队列中存放的是message,是一种跨进程的通信机制,用于上下游传递消息。MQ提供“逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后消息发送上游只需要依赖MQ,不需要依赖其它服务。

功能1:流量消峰

功能2:应用解耦

功能3:异步处理

MQ的分类:

1.Kafka

2.RabbitMQ

RabbitMQ概念:

四大核心概念:

交换机:

队列: 

 六大核心模式:

1.简单模式。2.工作模式。3.发布订阅模式。4.路由模式。5.主题模式。6.发布确认模式。

RabbitMQ工作原理:

Channer:信道,发消息的通道。

下载:

1. 官网地址:https://www.rabbitmq.com/download.html。参考的下载地址如下:Linux下安装RabbitMQ_rabbitmq下载_零碎de記憶的博客-CSDN博客

2.安装Erlang环境

yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz tcp_wrappers

3.下载Erlang,方式1:找到下面网址,在网址中下载rpm文件:

el/7/erlang-22.3.4.12-1.el7.x86_64.rpm - rabbitmq/erlang · packagecloud

或者直接输入下面指令下载rpm文件: 

wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-22.3.4.12-1.el7.x86_64.rpm/download.rpm

然后输入下面的命令安装已下载的安装包:

yum localinstall erlang-22.3.4.12-1.el7.x86_64.rpm

4.下载RabbitMQ,输入下面的下载

wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.13-1.el7.noarch.rpm/download.rpm

 输入下面的命令进行本地安装:

yum localinstall rabbitmq-server-3.8.13-1.el7.noarch.rpm

5. 下载socat,检查是否已下载:

yum install socat -y

注意以下的操作都要在 /usr/local/software目录下查看: 

6.添加开机启动RabbitMQ服务:chkconfig rabbitmq-server on。启动rabbitmq /sbin/service rabbitmq-server start。

7.查看服务状态 /sbin/service rabbitmq-server status

8.停止服务 /sbin/service rabbitmq-server stop。重新查看服务状态。

10.开启web管理界面 sudo rabbitmq-plugins enable rabbitmq_management

11.查看防火墙状态:systemctl status firewalld。关闭防火墙:systemctl stop firewalld。关闭rabbit服务器输入:sudo rabbitmqctl stop。开启rabbit服务器输入:sudo rabbitmq-server -detached。

12.在浏览器中输入地址:Linux服务器ip地址:15672,可访问web管理界面。

13.用户名guest,密码默认,但无法登陆,无权限。

14.rabbitmqctl list_users查看用户。创建账号 rabbitmqctl add_user admin 123。设置用户角色为管理员 rabbitmqctl set_user_tags admin administrator。设置用户权限 rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"。

15.再经尝试可以重新登录:

创建Java开发环境

1.创建1个新项目,命名atguigu-rabbitmq,然后创建模块Module。GroupId可以填写:com.atguigu.rabbitmq,ArtifactId可以填rabbitmq-hello,选择quickstart:

导入依赖如下:

  <dependencies><!--rabbitmq依赖客户端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><!--操作文件流的依赖--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency><dependency><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version> <!-- 根据你的需求指定版本号 --></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build>

在下图中,P是生产者,C是消费者。中间的框是一个队列-RabbitMQ代表使用者保留的消息缓存区。

生产者代码

public class producer {//队列名称public static final String QUEUE_NAME = "hello";//发消息public static void main( String[] args ) throws IOException, TimeoutException {//第1步:创建一个连接工程ConnectionFactory factory = new ConnectionFactory();//第2步:输入工厂IP,用户名和密码——连接RabbitMQd队列factory.setHost("192.168.182.136");factory.setUsername("admin");factory.setPassword("123");//第3步:创建连接Connection connection = factory.newConnection();//第4步:获取信道Channel channel = connection.createChannel();//第5步:生成一个队列(队列名称,是否持久化,是否排他,自动删除,队列参数)//持久化:是否存储入磁盘,默认是将消息存储在内存中//排他:队列是否只供一个消费者消费,是否进行消息共享,true可以供多个消费者消费//自动删除:最后一个消费者断开连接后,该队列是否自动删除channel.queueDeclare(QUEUE_NAME,false,false,false,null);//第6步:发消息,(交换机,路由key本次是队列名,参数,发送的消息)String message = "hello world";channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息发送成功!!!");}
}

消费者代码

public class consumer {public static final String QUEUE_NAME = "hello";public static void main(String [] args) throws IOException, TimeoutException {//第1步:创建一个连接工程ConnectionFactory factory = new ConnectionFactory();//第2步:输入工厂IP,用户名和密码——连接RabbitMQd队列factory.setHost("192.168.182.136");factory.setUsername("admin");factory.setPassword("123");//第3步:创建连接Connection connection = factory.newConnection();//第4步:获取信道Channel channel = connection.createChannel();//第5步:声明,接收消息DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println(new String(message.getBody()));};//第6步:取消消息时的回调CancelCallback cancelCallback = consumerTag->{System.out.println("消息消费被中断");};//第7步:接收,(队列名,自动or手动,接收消息,回调)//1.消费哪个队列;2.消费成功后是否要自动应答true代表自动应答,false表示手动应答//3.消费者未成功消费的回调channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

注意几点:1.确保rabbitmq处于开启的状态(开启方式见前面)2.最好让防火墙处于关闭的状态 3.最好通过方法左侧的开关按钮进行启动,确保启动是选择Current File。

 

工作队列:

工作队列:又称任务队列,主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

情况:生产者大量分发消息给队列,工作线程接收队列的消息,工作线程不止一个,三者关系时竞争关系,你一条我一条他一条,但要注意一个消息只能被处理一次,不能被处理多次。

重复性的代码可以被抽取成为工具类。

在java — com — atguigu — rabbitmq下创建utils包,工具类起名RabbitMqUtils,放入如下代码:

public class RabbitMqUtils {public static Channel getChannel() throws Exception{//第1步:创建一个连接工程ConnectionFactory factory = new ConnectionFactory();//第2步:输入工厂IP,用户名和密码——连接RabbitMQd队列factory.setHost("192.168.182.137");factory.setUsername("admin");factory.setPassword("123");//第3步:创建连接Connection connection = factory.newConnection();//第4步:获取信道Channel channel =  connection.createChannel();return channel;}
}

工作线程的更新后,worker01的代码如下:

public static final String QUEUE_NAME = "hello";public static void main(String [] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);//声明队列,没有会报错//消息接收DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println("接收到的消息:" + new String(message.getBody()));};CancelCallback cancelCallback = (consumerTag)->{System.out.println(consumerTag + "消息被取消消费接口回调逻辑");};System.out.println("c1等待接收消息......");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}

重复利用one包下的consumer类,将其更改为c2工作线程:

Task01作为生产者用于生产数据,与前面不同的是,Task01支持从IDEA控制台输入数据:

public class Task01 {public static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.queueDeclare(QUEUE_NAME,false,false,false,null);//从控制台当中接收信息Scanner scanner = new Scanner(System.in); //扫描控制台输入内容while(scanner.hasNext()){String message = scanner.next();channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("发送消息完成..");}}
}

  

消息应答:

概念:

自动应答:

手动应答:

手动应答好处,建议不批量应答,选择false:

 消息自动重新入队:

原本正常传输,C1突然失去连接,检测到C1断开连接,于是会让消息重新入队,原本的消息交由C2进行处理。

实验思路:写1个生产者,2个消费者,当关闭掉其中1个工作线程,消息不丢失,还被另一个工作线程接收。消费在手动应答时不丢失、放回队列中重新消费。

消息手动应答(生产者):

public class Task2 {
public static final String task_queue_name = "ack_queue";
public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.queueDeclare(task_queue_name,false,false,false,null);Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){String message = scanner.next();channel.basicPublish("",task_queue_name,null,message.getBytes("UTF-8"));System.out.println("生产者发出消息:"+message);}
}
}

消息手动应答(消费者):

public class Work03 {public static final String task_queue_name = "ack_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.out.println("C1等待接收消息处理时间较短");DeliverCallback deliverCallback = (consumerTag,message)->{SleepUtils.sleep(1);System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));//1.消息的标记tag 2.是否批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//采用手动应答boolean autoAck = false;channel.basicConsume(task_queue_name,autoAck,deliverCallback,(consumerTag->{System.out.println(consumerTag + "消费者取消消费接口回调逻辑");}));}
}
public class Work04 {public static final String task_queue_name = "ack_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.out.println("C2等待接收消息处理时间较短");DeliverCallback deliverCallback = (consumerTag,message)->{SleepUtils.sleep(30);System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));//1.消息的标记tag 2.是否批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//采用手动应答boolean autoAck = false;channel.basicConsume(task_queue_name,autoAck,deliverCallback,(consumerTag->{System.out.println(consumerTag + "消费者取消消费接口回调逻辑");}));}
}

实现效果:在生产者输入AA BB CC DD EE等消息,消费者1接收速度快,会立即打印AA CC EE等消息,消费者2接收速度慢,会在一段时间后接收到BB,此时如果关闭消费者2,则消费者1输出DD,表明消费在手动应答时不丢失、放回队列中重新消费。

持久化:

如果报错,说明原本的队列就是不持久化,此时无法设定持久化,只能先将队列删除然后再重新设定。

控制队列持久化,需要修改生产者声明函数的第2个参数:

消息持久化:

队列持久化和消息持久化不同,队列是MQ里的一个组件,消息是生产者发送的消息。

如果要让消息持久化,在发消息的时候就要通知队列。

更改的是生产者的信道的basicPublish的第3个参数,添加MessageProperties.PERSISTENT_TEXT_PLAIN

不公平分发: 

消费者处理任务的速度不一致,为了不让速度快的消费者长时间处于空闲状态,因此采用不公平分发。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

预取值:

前面N条数据分别交给谁处理,如下图就是前7条数据中,2条给C1,5条给C2

发布确认原理:

1.设置要求队列必须持久化:就算服务器宕机,队列也不至于消失。

2.设置要求队列中的消息也必须持久化。

3. 发布确认,消息保存到磁盘上之后,队列要告知生产者。

Channel channel = connection.createChannel();
channel.confirmSelect();

public static void main(String[] args){
}

单个发布确认:

是一种同步确认发布的方式,发布消息-确认消息-发布消息...必须要确认后才能继续发布,类似于一手交钱一手交货,缺点是发布速度很慢。

1. 创建com/atguigu/rabbitmq/four文件夹下的ConfirmMessage

public static void publishMessageIndividually() throws Exception{Channel channel = RabbitMqUtils.getChannel(); //获取信道String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);channel.confirmSelect();//开启发布确认long begin = System.currentTimeMillis();for(int i=0;i<MESSAGE_COUNT;i++){String message = i +"";channel.basicPublish("",queueName,null,message.getBytes());boolean flag = channel.waitForConfirms();if(flag){System.out.println("消息发送成功");}}long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个单独确认消息,耗时:"+(end-begin)+"ms");}

批量发布确认:

public static void publishMessageBatch() throws Exception{Channel channel = RabbitMqUtils.getChannel(); //获取信道String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);channel.confirmSelect();//开启发布确认long begin = System.currentTimeMillis();int batchSize = 100; //批量确认消息的大小//批量发送消息,批量发布确认for(int i=0;i<MESSAGE_COUNT;i++){String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());//判断达到100条消息的时候,批量确认一次if(i%batchSize==0) channel.waitForConfirms();}long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个批量确认消息,耗时:"+(end-begin)+"ms");
}

异步发布确认:

map序列,key是消息序号(deliveryTag是消息的标识,multiple是是否为批量),value是消息内容,将消息每一条都编号,broker会对消息进行应答,分为两种一种是确认应答,另一种是未确认应答。消息生产者不需要等待接收方的消息,只需要负责发送消息即可,消息是否应答最终会以异步的形式回传,也就是说确认的时间可以是稍后的。

addConfirmListener单参数的是只能监听成功的,多参数的是可以监听成功也可以监听失败的,都是接口需要自己来写。

public static void publishMessageAsync() throws Exception{Channel channel = RabbitMqUtils.getChannel(); //获取信道String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);channel.confirmSelect();//开启发布确认long begin = System.currentTimeMillis();//消息确认成功,回调函数ConfirmCallback ackCallback = (deliveryTag, multiple)->{System.out.println("确认的消息:"+deliveryTag);};//消息确认失败回调函数ConfirmCallback nackCallback = (deliveryTag, multiple)->{System.out.println("未确认的消息:"+deliveryTag);};//准备消息的监听器,监听哪些消息成功了,哪些消息失败了channel.addConfirmListener(ackCallback,nackCallback);//批量发送消息for(int i=0;i<MESSAGE_COUNT;i++){String message="消息"+i;channel.basicPublish("",queueName,null,message.getBytes());//发布确认}long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个异步确认消息,耗时:"+(end-begin)+"ms");
}

 处理异步未确认消息:

最好的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用ConcurrentLinkedQueue这个队列在confirm callbacks与发布线程之间进行消息的传递。

public static void publishMessageAsync() throws Exception{Channel channel = RabbitMqUtils.getChannel(); //获取信道String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);channel.confirmSelect();//开启发布确认/*线程安全有序的一个哈希表,适用于高并发的情况下1.轻松地将序号与消息进行关联2.轻松地批量删除条目只要给到序号3.支持高并发(多线程)*/ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();//消息确认成功,回调函数ConfirmCallback ackCallback = (deliveryTag, multiple)->{if(multiple){//2.删除掉已经确认的消息,剩下的就是未确认的消息ConcurrentNavigableMap<Long, String> confirmd =outstandingConfirms.headMap(deliveryTag);}else{outstandingConfirms.remove(deliveryTag);}System.out.println("确认的消息:"+deliveryTag);};//消息确认失败回调函数ConfirmCallback nackCallback = (deliveryTag, multiple)->{//3.打印一下未确认的消息都有哪些String message = outstandingConfirms.get(deliveryTag);System.out.println("未确认的消息是:"+message+"未确认的消息:"+deliveryTag);};//准备消息的监听器,监听哪些消息成功了,哪些消息失败了channel.addConfirmListener(ackCallback,nackCallback);long begin = System.currentTimeMillis();//批量发送消息for(int i=0;i<MESSAGE_COUNT;i++){String message="消息"+i;channel.basicPublish("",queueName,null,message.getBytes());//1.此处记录下所有发送的消息,消息的总和outstandingConfirms.put(channel.getNextPublishSeqNo(),message);}long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个异步确认消息,耗时:"+(end-begin)+"ms");}
}

三种方式对比:

交换机:

一个消息可以被消费多次,需要通过交换机,仍旧遵循队列中的消息只能被消费一次。

 生产者生产的消息从不会直接发送到队列。生产者将消息发送到交换机。交换机负责接收来自生产者的消息,将消息推入队列。

Exchanges的类型:直接(direct),主题(topic),标题(headers),扇出(fanout)

消息能路由发送到队列中其实是由routingKey(bindingkey)绑定key指定的。

创建临时队列:

String queueName = channel.queueDedare().getQueue();

绑定:

根据Routing key来确定消息要发给哪个队列,如果Routing Key相同消息就可以发送给多个队列。

先添加一个队列queue1,再添加一个交换机exchange1,最后点击exchange1交换机,进入绑定菜单,然后输入绑定的队列是queue1,然后Routing key随便设置为123。

广播Fanout:

Fanout(扇出)是将接收到的所有消息广播到它知道的所有队列中。如果Routing Key相同则发送给队列以相同消息。

生产者:

public class EmitLog {public static final String EXCHANGE_NAME="logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,"fanout");Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){String message = scanner.next();channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));System.out.println("生产者发出消息"+message);}}
}

消费者:

public class ReceiveLogs01 {public static final String EXCHANGE_NAME="logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//声明一个交换机//声明一个队列临时队列,队列的名称是随机的,当消费者断开与队列的连接时候,队列就删除了String queueName = channel.queueDeclare().getQueue();//绑定交换机与队列channel.queueBind(queueName,EXCHANGE_NAME,"");System.out.println("等待接收消息,把接收到消息打印在屏幕上......");DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println("ReceiveLogs01控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));};channel.basicConsume(queueName,true,deliverCallback,consumerTag->{});}
}

效果:实现广播的功能

Direct路由交换机:

消费者1:

public class ReceiveLogsDirect01 {public static final String EXCHANGE_NAME="direct_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);channel.queueDeclare("console",false,false,false,null);channel.queueBind("console",EXCHANGE_NAME,"info"); //队列名称,交换机名称,RoutingkeyDeliverCallback deliverCallback =(consumerTag,message)->{System.out.println("ReceiveLogsDirect01控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));};channel.basicConsume("console",true,deliverCallback,consumerTag->{});}
}

消费者2:

public class ReceiveLogsDirect02 {public static final String EXCHANGE_NAME="direct_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);channel.queueDeclare("disk",false,false,false,null);channel.queueBind("disk",EXCHANGE_NAME,"error"); //队列名称,交换机名称,RoutingkeyDeliverCallback deliverCallback =(consumerTag,message)->{System.out.println("ReceiveLogsDirect02控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));};channel.basicConsume("disk",true,deliverCallback,consumerTag->{});}
}

生产者: 

public class DirectLogs {public static final String EXCHANGE_NAME="direct_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){String message = scanner.next();channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8"));System.out.println("生产者发出消息"+message);}}
}

效果:

如果【channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8"));】的第2个参数填info就只会发送消息给消费者1,填写error就只会发送消息给消费者2。

Topics主题交换机:

发布(生产者)订阅(消费者)模式:

消费者1:

public class ReceiveLogsTopic01 {public static final String EXCHANGE_NAME="topic_logs";//交换机名称public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,"topic");String queueName="Q1";channel.queueDeclare(queueName,false,false,false,null);channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println(new String(message.getBody(),"UTF-8"));System.out.println("接收队列:"+queueName+" 绑定键:"+message.getEnvelope().getRoutingKey());};channel.basicConsume(queueName,true,deliverCallback,consumerTag->{});}
}

消费者2:

public class ReceiveLogsTopic02 {public static final String EXCHANGE_NAME="topic_logs";//交换机名称public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,"topic");String queueName="Q2";channel.queueDeclare(queueName,false,false,false,null);channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println(new String(message.getBody(),"UTF-8"));System.out.println("接收队列:"+queueName+" 绑定键:"+message.getEnvelope().getRoutingKey());};channel.basicConsume(queueName,true,deliverCallback,consumerTag->{});}
}

 生产者1:

public class EmitLogTopic {public static final String EXCHANGE_NAME="topic_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();Map<String,String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("quick.orange.rabbit","被队列Q1Q2接收到");bindingKeyMap.put("lazy.orange.elephant","被队列Q1Q2接收到");bindingKeyMap.put("quick.orange.fox","被队列Q1接收到");bindingKeyMap.put("lazy.brown.fox","被队列Q2接收到");bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列Q2接收一次");bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配Q2");for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {String routingKey = bindingKeyEntry.getKey();String message =  bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));System.out.println("生产者发出消息:"+message);}}
}

结果:

死信:

无法被消费消息被称为死信

死信的来源:

死信实战架构图:

1个生产者2个消费者。生产者原本走正常交换机,消息走正常队列,被C1消费。当满足消息被拒绝,消息TTL过期,队列达到最大长度三者其一时,消息成为死信,会进入dead_exchange交换机,进入dead_queue死信队列,死信队列的信息由C2消费。

消费者1:

public class Consumer01 {//普通交换机的名称public static final String NORMAL_EXCHANGE ="normal_exchange";//死信交换机的名称public static final String DEAD_EXCHANGE = "dead_exchange";//普通队列的名称public static final String NORMAL_QUEUE = "normal_queue";//死信队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws Exception {Channel channel1 = RabbitMqUtils.getChannel();//声明死信和普通交换机,类型为directchannel1.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel1.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);Map<String,Object> arguments = new HashMap<>(); //设置参数//正常队列设置死信交换机arguments.put("x—dead—letter—exchange",DEAD_EXCHANGE); //****相当于正常的C1不能消费掉就通过这个交换机进行转发//设置死信RoutingKeyarguments.put("x—dead—letter—routing—key","lisi"); //***//声明普通队列channel1.queueDeclare(NORMAL_QUEUE,false,false,false,arguments); //正常交换机不正常,需要将死信转发给死信队列//声明死信队列channel1.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定普通的交换机与队列channel1.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");//绑定死信的交换机与死信的队列channel1.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println("Consumer01接收的消息是:" + new String(message.getBody(),"UTF-8"));};channel1.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumerTag->{});}
}

消费者2:

public class Consumer02 {//死信队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws Exception {Channel channel1 = RabbitMqUtils.getChannel();System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println("Consumer02接收的消息是:" + new String(message.getBody(),"UTF-8"));};channel1.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag->{});}
}

生产者:

public class Producer {public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//死信消息,设置TTL时间,time to liveAMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 1; i < 11; i++) {String message = "info"+i;channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/94018.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

利用Qt实现可视化科学计算器

&#x1f4de;个人信息 学号&#xff1a;102101433 姓名&#xff1a;林堂钦 &#x1f4a1; 作业基本信息 【课程】福州大学2021级软件工程Ahttps://bbs.csdn.net/forums/ssynkqtd-05作业要求链接https://bbs.csdn.net/topics/617294583作业目标 实现一个简易计算器&…

最短路径专题2 最短距离-多终点(堆优化版)

题目&#xff1a;样例&#xff1a; 输入 6 6 0 0 1 2 0 2 5 0 3 1 2 3 2 1 2 1 4 5 1 输出 0 2 3 1 -1 -1 思路&#xff1a; 根据题意&#xff0c;数据范围也小&#xff0c;也可以用朴素版的Dijsktra来做&#xff0c;朴素版的Dijsktra我做过了一遍了&#xff0c;可以看以一下我…

MySQL - mysql服务基本操作以及基本SQL语句与函数

文章目录 操作mysql客户端与 mysql 服务之间的小九九了解 mysql 基本 SQL 语句语法书写规范SQL分类DDL库表查增 mysql数据类型数值类型字符类型日期类型 示例修改&#xff08;表操作&#xff09; DML添加数据删除数据修改数据 DQL查询多个字段条件查询聚合函数分组查询排序查询…

【数据科学】Scikit-learn[Scikit-learn、加载数据、训练集与测试集数据、创建模型、模型拟合、拟合数据与模型、评估模型性能、模型调整]

这里写目录标题 一、Scikit-learn二、加载数据三、训练集与测试集数据四、创建模型4.1 有监督学习评估器4.1.1 线性回归4.1.2 支持向量机(SVM)4.1.3 朴素贝叶斯4.1.4 KNN 4.2 无监督学习评估器4.2.1 主成分分析(PCA)4.2.2 K Means 五、模型拟合5.1 有监督学习5.2 无监督学习 六…

React18入门(第一篇)——JSX、TSX语法详解

文章目录 一、JSX 语法简介二、和 HTML 标签的几点不同三、JSX 属性四、JSX 事件4.1 简单点击事件4.2 类型限制4.3 带参数&#xff0c;箭头函数 五、插入 JS 变量六、JSX 中使用条件判断七、循环 一、JSX 语法简介 JSX - 是 JS 的扩展&#xff0c;写在 JS 代码里面&#xff0c…

STM32 DMA从存储器发送数据到串口

1.任务描述 &#xff08;1&#xff09;ds18b20测量环境温度存储到存储器&#xff08;数组&#xff09;中。 &#xff08;2&#xff09;开启DMA将数组中的内容&#xff0c;通过DMA发送到串口 存在问题&#xff0c;ds18b20读到的数据是正常的&#xff0c;但是串口只是发送其低…

WSL安装异常:WslRegisterDistribution failed with error: 0xc03a001a

简介&#xff1a;如果文件夹右上角是否都有两个相对的蓝色箭头&#xff0c;在进行安装wsl时&#xff0c;设置就会抛出 Installing WslRegisterDistribution failed with error: 0xc03a001a的异常 历史攻略&#xff1a; 卸载WSL WSL&#xff1a;运行Linux文件 WSL&#xff1…

Java下正面解除警告Unchecked cast: ‘java.lang.Object‘ to ‘java.util.ArrayList‘

就是我在反序列化时&#xff0c;遇到这样一个警告&#xff1a; Unchecked cast: java.lang.Object to java.util.ArrayList<com.work1.Student>然后我去网上查&#xff0c;有些人说用SuppressWarnings(“unchecked”)去忽略警告&#xff0c;但是我觉得作为一名合格的程序…

postgresql-自增字段

postgresql-自增字段 标识列IdentitySerial类型Sequence序列 标识列Identity -- 测试表 create table t_user( -- 标识列自增字段user_id integer generated always as identity primary key,user_name varchar(50) not null unique );-- 自动生成序列 CREATE SEQUENCE public…

【重拾C语言】三、分支程序设计(双分支和单分支程序设计、逻辑判断、多分支程序设计、枚举类型表示;典型例题:判断闰年和求一元二次方程根)

目录 前言 三、分支程序设计 3.1 判断成绩是否及格——双分支程序设计 3.2 成绩加上获奖信息—单分支程序设计 3.3 逻辑判断——布尔类型 3.4 获奖分等级——多分支程序设计 3.5 表示汽车种类——枚举类型 3.6 例题 3.6.1 例题——判断某个年份是否闰年 3.6.2 例题—…

MySQL 性能优化

MySQL 性能优化 数据库命名规范 所有数据库对象名称必须使用小写字母并用下划线分割所有数据库对象名称禁止使用 MySQL 保留关键字&#xff08;如果表名中包含关键字查询时&#xff0c;需要将其用单引号括起来&#xff09;数据库对象的命名要能做到见名识意&#xff0c;并且最…

【记录】IDA|IDA怎么查看当前二进制文件自动分析出来的内存分布情况(内存范围和读写性)

IDA版本&#xff1a;7.6 背景&#xff1a;我之前一直是直接看Text View里面的地址的首尾地址来判断内存分布情况的&#xff0c;似乎是有点不准确&#xff0c;然后才想到IDA肯定自带查看内存分布情况的功能&#xff0c;而且很简单。 可以通过View-Toolbars-Segments&#xff0c…

同学苹果ios的ipa文件应用企业代签选择签名商看看这篇文章你再去吧

同学我们要知道随着互联网的发展&#xff0c;苹果应用市场的火爆&#xff0c;越来越多的开发者加入到苹果应用开发行业中来。同时&#xff0c;苹果应用市场上的应用也在不断增多&#xff0c;用户数量也在不断增加&#xff0c;苹果应用代签是指通过第三方公司为开发者的应用进行…

计算机视觉——飞桨深度学习实战-起始篇

后面我会直接跳到实战项目&#xff0c;将计算机视觉的主要任务和目标都实现一遍&#xff0c;但是需要大家下去自己多理解和学习一下。例如&#xff0c;什么是深度学习&#xff0c;什么是计算机视觉&#xff0c;什么是自然语言处理&#xff0c;计算机视觉的主要任务有哪些&#…

14.(开发工具篇github)如何在Github配置ssh key

第一步&#xff1a;检查本地主机是否已经存在ssh key 上图表示已存在。跳第三步 第二步&#xff1a;生成ssh key ssh-keygen -t rsa -C "xxxxxx.com"第三步&#xff1a;获取ssh key公钥内容&#xff08;id_rsa.pub&#xff09; cat id_rsa.pub第四步&#xff1a;G…

CompletableFuture 异步编排

目录 CompletableFuture 的详解代码测试配置类的引入Demo1Demo2CompletableFuture的async后缀函数与不带async的函数的区别ThreadPoolTaskExecutor 和 ThreadPoolExecutor 的区别Spring 线程池的使用业务使用多线程的原因场景一:场景二:FutureTask介绍线程池为什么要使用阻塞队…

数据结构 1.1 初学数据结构

数据结构的基本概念 数据结构在学什么&#xff1f; 如何用程序代码把现实世界的问题信息化 如何用计算机高效处理信息从而创造价值 数据&#xff1a; 数据元素、数据项&#xff1a; 数据元素——描述一个个体 数据对象——数据元素之间具有同样的性质 同一个数据对象里的数…

Springboot+Vue+Mysql实现模拟汽车保养系统(附源码)

前言 本项目基于springbootvue搭建的汽车保养的系统&#xff0c;页面较为粗糙&#xff0c;前端好的小伙伴可自行优化。 项目环境 -环境框架后端JDK1.8SpringBootmybatisPlus前端NodeJS16.0Vue2.0ElementPlus数据库MySQL8.0- 数据库设计 数据表备注banner轮播图表car用户汽…

linux命令行配置音频设备

linux命令行配置音频设备 TLTR在linux命令行播放音乐cmus需要开始声音条件功能才能调节播放的音量&#xff0c;看这个链接&#xff0c;继续折腾&#xff0c;have fun! TLTR 以archLinux为例&#xff0c;把下面软件都装一遍。 sudo pacman -S alsa-utils sudo pacman -S alsa-…

算法-数学-斜率-直线上最多的点数

算法-数学-斜率-直线上最多的点数 1 题目概述 1.1 题目出处 https://leetcode.cn/problems/max-points-on-a-line/ 1.2 题目描述 给你一个数组 points &#xff0c;其中 points[i] [xi, yi] 表示 X-Y 平面上的一个点。求最多有多少个点在同一条直线上。 2 暴力搜索斜率…