RMQ从入门到精通

一.概述与安装

//RabbitMQ
//1.核心部分-高级部分-集群部分
//2.什么是MQ 消息队列message queue 先入先出原则;消息通信服务
//3.MQ的大三功能 流量消峰 应用解耦 消息中间件
//(1)人-订单系统(1万次/S)—> 人 - MQ(流量消峰,对访问人员进行排队) -订单系统(保护系统不宕机)
//(2)订单系统-支付/库存/物流系统—> 订单系统-MQ-支付/库存/物流系统
//(3)A -API -B  —> A - MQ -B  这样可以通过MQ完成时告知A
//4.MQ的分类 ActiveMQ(老) Kafka(大数据的杀手锏) RocketMQ RabbitMQ(中小型公司推荐)
//5.RabbitMQ概念
//(1)就是一个快递站  发件人-快递员-快递站MQ-快递员-收件人
//(2)生产者-MQ(1交换机、N队列)-1个队列对应1个消费者
//6.核心部分 6大模式
//(1)简单模式Hello World
//(2)工作模式Work queues
//(3)发布订阅模式Publish/Subscribe
//(4)路由模式Routing
//(5)主体模式Topics
//(6)发布确认模式Publisher Confirms
//(7)Broker消息实体(可以有多个交换机Exchange(可以有多个队列Queue));Connection(多个Channel)
// Producer生产者 Consumer消费者 Binding绑定,交换机和queue之间的虚拟连接
//7.RMQ的安装
//集中下载 链接:https://pan.baidu.com/s/1NJfYnLT4DN-uu-uyIXzA4w  提取码:HIT0
//(1)先安装erlang  yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto perl wget gtk2-devel binutils-devel
//(2)下载 wget http://erlang.org/download/otp_src_22.0.tar.gz
//(3)解压 tar -zxvf otp_src_22.0.tar.gz
//(4)移动 mv otp_src_22.0 /usr/local/
//(5)切换目录 cd /usr/local/otp_src_22.0/
//(6)创建即将安装的目录 mkdir ../erlang
//(7)配置安装路径 ./configure --prefix=/usr/local/erlang
//(8)安装 make install
//(9)查看一下是否安装成功 ll /usr/local/erlang/bin
//(10)添加环境变量 echo 'export PATH=$PATH:/usr/local/erlang/bin' >> /etc/profile
//(11)刷新环境变量 source /etc/profile
//(12)甩一条命令 erl    halt(). 退出
//---------------------安装RMQ-------------------------
//(13)下载 wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.15/rabbitmq-server-generic-unix-3.7.15.tar.xz
//(14)由于是tar.xz格式的所以需要用到xz,没有的话就先安装  yum install -y xz
//(15)第一次解压 /bin/xz -d rabbitmq-server-generic-unix-3.7.15.tar.xz
//(16)第二次解压 tar -xvf rabbitmq-server-generic-unix-3.7.15.tar
//(17)移动 mv rabbitmq_server-3.7.15/ /usr/local/
//(18)改名 mv /usr/local/rabbitmq_server-3.7.15  /usr/local/rabbitmq
//(19)配置环境变量 echo 'export PATH=$PATH:/usr/local/rabbitmq/sbin' >> /etc/profile
//(20)刷新环境变量 source /etc/profile
//(21)创建配置目录 mkdir /etc/rabbitmq
//(22)启动:rabbitmq-server -detached
//(23)停止:rabbitmqctl stop
//(24)状态:rabbitmqctl status
//(25)开放端口 firewall-cmd --zone=public --add-port=15672/tcp --permanent     firewall-cmd --zone=public --add-port=5672/tcp --permanent
//(26)开启web插件 rabbitmq-plugins enable rabbitmq_management
//(27)访问 http://wdfgdzx.top:15672/   默认账号密码:guest guest(这个账号只允许本机访问)
//(28)查看所有用户 rabbitmqctl list_users
//(29)添加一个用户 rabbitmqctl add_user xlliu24 s19911009!
//(30)配置权限 rabbitmqctl set_permissions -p "/" xlliu24 ".*" ".*" ".*"
//(31)查看用户权限 rabbitmqctl list_user_permissions xlliu24
//(32)设置tag   rabbitmqctl set_user_tags xlliu24 administrator
//(33)删除用户(安全起见,删除默认用户) rabbitmqctl delete_user guest
//(34)然后用新用户登录,成功后看到界面

二.如何使用RMQ

//1.Hello World
//(1)引入maven包  amqp-client commons-io
//(2)P(生产者) -发消息-队列hello(中间件)- 接受消息-C(消费者)
//生产者
package com.day.controller;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
//发消息
public class Producer {//队列名称public static final String QUEUE_NAME="hello";//发消息public static void main(String[] args) throws Exception {//创建工厂ConnectionFactory connectionFactory=new ConnectionFactory();//工厂IP 连接RMQ的队列connectionFactory.setHost("47.105.174.97");//用户名密码connectionFactory.setUsername("xlliu24");connectionFactory.setPassword("s19911009!");//创建连接Connection connection =connectionFactory.newConnection();//获取信道Channel channel =connection.createChannel();//创建队列/*** 1.队列名称* 2.队列里面的消息是否持久化 默认情况消息存储在内存中 持久化存储在磁盘* 3.该队列是否只供一个消费者进行消费 是否进行消息共享 true可以多个消费者消费 false则不允许* 4.是否自动删除 最后一个消费者断开后 改队列是否自动删除 true自动删除 false反* 5.其他参数* */channel.queueDeclare(QUEUE_NAME,true,false,false,null);//发消息String message="Hello World";/*** 1.发送到那个交换机* 2.路由的key值 本次是队列的名称* 3.其他参数信息* 4.发送的消息* */channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息发送完毕...");}
}
//消费者
package com.day.controller;
import com.rabbitmq.client.*;
//消费者
public class Consumer {//队列的名称public static final String QUEUE_NAME="Hello";//接收消息public static void main(String[] args) throws Exception {//创建连接工厂ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setHost("47.105.174.97");connectionFactory.setUsername("xlliu24");connectionFactory.setPassword("s19911009!");//创建新链接Connection connection=connectionFactory.newConnection();Channel channel=connection.createChannel();//声明 接受消息DeliverCallback deliverCallback=(consumerTag,message) ->{System.out.println(new String(message.getBody()));};//取消消费CancelCallback cancelCallback=consumerTage ->{System.out.println("消费消息被中断...");};//消费者消费消息/*** 1.消费哪个队列* 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调** */channel.basicConsume("hello",true,deliverCallback,cancelCallback);}
}
------------------------------------
//2.Work Queues 工作队列
//(1)生产者-大量发消息-队列hello-接受消息-N个消费者(工作线程)
//(2)注意:一个消息只能被处理一次,不能被处理多次。所以工作线程采用的是轮训分发消息
// 抽取信道工具类
package com.day.controller;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RMQUtil{public static Channel getChannel() throws Exception{//创建连接工厂ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setHost("47.105.174.97");connectionFactory.setUsername("xlliu24");connectionFactory.setPassword("s19911009!");//创建新链接Connection connection=connectionFactory.newConnection();return connection.createChannel();}
}
-----------------------
package com.day.controller;
import com.rabbitmq.client.Channel;
//生产者,发送大量的消息
public class Producer {//队列名称public static final String QUEUE_NAME="hello";//发消息public static void main(String[] args) throws Exception {//获取信道Channel channel =RMQUtil.getChannel();//创建队列/*** 1.队列名称* 2.队列里面的消息是否持久化 默认情况消息存储在内存中 持久化存储在磁盘* 3.该队列是否只供一个消费者进行消费 是否进行消息共享 true可以多个消费者消费 false则不允许* 4.是否自动删除 最后一个消费者断开后 改队列是否自动删除 true自动删除 false反* 5.其他参数* */channel.queueDeclare(QUEUE_NAME,true,false,false,null);//发消息String message="Hello World";/*** 1.发送到那个交换机* 2.路由的key值 本次是队列的名称* 3.其他参数信息* 4.发送的消息* */for(int i=0;i<1000;i++){channel.basicPublish("",QUEUE_NAME,null,(message+" "+i).getBytes());System.out.println("消息发送完毕...");}}
}
-------------------
package com.day.controller;
import com.rabbitmq.client.*;
//消费者
public class Consumer {//队列的名称public static final String QUEUE_NAME="hello";//接收消息public static void main(String[] args) throws Exception {//信道Channel channel=RMQUtil.getChannel();//声明 接受消息DeliverCallback deliverCallback=(consumerTag,message) ->{System.out.println("C0接收到的消息: "+new String(message.getBody()));};//取消消费CancelCallback cancelCallback=consumerTage ->{System.out.println(consumerTage+" 消费消息被中断...");};//消费者消费消息/*** 1.消费哪个队列* 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调** */System.out.println("C0等待接受消息...");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}
----------------
package com.day.controller;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
//消费者
public class Consumer1 {//队列的名称public static final String QUEUE_NAME="hello";//接收消息public static void main(String[] args) throws Exception {//信道Channel channel=RMQUtil.getChannel();//声明 接受消息DeliverCallback deliverCallback=(consumerTag,message) ->{System.out.println("C1接收到的消息: "+new String(message.getBody()));};//取消消费CancelCallback cancelCallback=consumerTage ->{System.out.println(consumerTage+" 消费消息被中断...");};//消费者消费消息/*** 1.消费哪个队列* 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调** */System.out.println("C1等待接受消息...");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}
----------------------
package com.day.controller;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
//消费者
public class Consumer2 {//队列的名称public static final String QUEUE_NAME="hello";//接收消息public static void main(String[] args) throws Exception {//信道Channel channel=RMQUtil.getChannel();//声明 接受消息DeliverCallback deliverCallback=(consumerTag,message) ->{System.out.println("C2接收到的消息: "+new String(message.getBody()));};//取消消费CancelCallback cancelCallback=consumerTage ->{System.out.println(consumerTage+" 消费消息被中断...");};//消费者消费消息/*** 1.消费哪个队列* 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调** */System.out.println("C2等待接受消息...");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}
-----------------控制台打印信息如下----------------------
C0接收到的消息: Hello World 0
C0接收到的消息: Hello World 3
C0接收到的消息: Hello World 6
C0接收到的消息: Hello World 9
----------
C1接收到的消息: Hello World 1
C1接收到的消息: Hello World 4
C1接收到的消息: Hello World 7
C1接收到的消息: Hello World 10
-----------
C2接收到的消息: Hello World 2
C2接收到的消息: Hello World 5
C2接收到的消息: Hello World 8
C2接收到的消息: Hello World 11//3.消息应答
//(1)即消费者处理完毕后-应答-生产者删除消息
//(2)自动应答对环境要求高,并不可取;
//(3)手动应答:basicAck(肯定) basicNack basicReject
//(4)批量应答multiple 当前8 true 5 6 7 8都确认, false只会应答8,建议使用false
//(5)消息自动重新入队,当某个通道发生异常时,RMQ将了解到消息未完全处理,并将对其重新排队。让其他通道处理,保证消息的不丢失与处理。
//(6)目的:手动应答保证消息的不丢失。
// 抽取信道工具类
package com.day.controller;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RMQUtil{public static Channel getChannel() throws Exception{//创建连接工厂ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setHost("47.105.174.97");connectionFactory.setUsername("xlliu24");connectionFactory.setPassword("s19911009!");//创建新链接Connection connection=connectionFactory.newConnection();return connection.createChannel();}
}
--------
package com.day.controller;
import com.rabbitmq.client.Channel;
//生产者,发送大量的消息
public class Producer {//队列名称public static final String QUEUE_NAME="hello";//发消息public static void main(String[] args) throws Exception {//获取信道Channel channel =RMQUtil.getChannel();//创建队列/*** 1.队列名称* 2.队列里面的消息是否持久化 默认情况消息存储在内存中 持久化存储在磁盘* 3.该队列是否只供一个消费者进行消费 是否进行消息共享 true可以多个消费者消费 false则不允许* 4.是否自动删除 最后一个消费者断开后 改队列是否自动删除 true自动删除 false反* 5.其他参数* */channel.queueDeclare(QUEUE_NAME,true,false,false,null);//发消息String message="Hello World";/*** 1.发送到那个交换机* 2.路由的key值 本次是队列的名称* 3.其他参数信息* 4.发送的消息* */for(int i=0;i<10;i++){channel.basicPublish("",QUEUE_NAME,null,(message+" "+i).getBytes("UTF-8"));System.out.println((message+" "+i)+" 消息发送完毕...");}}
}
--------
package com.day.controller;
import com.rabbitmq.client.*;
//消费者
public class Consumer {//队列的名称public static final String QUEUE_NAME="hello";//接收消息public static void main(String[] args) throws Exception {//信道Channel channel=RMQUtil.getChannel();//声明 接受消息DeliverCallback deliverCallback=(consumerTag,message) ->{try {Thread.sleep(30000);} catch (Exception e) {e.printStackTrace();}System.out.println("C0接收到的消息: "+new String(message.getBody(),"UTF-8"));// 1.标记tag 2.不批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//取消消费CancelCallback cancelCallback=consumerTage ->{System.out.println(consumerTage+" 消费消息被中断...");};//消费者消费消息/*** 1.消费哪个队列* 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调** */System.out.println("C0等待接受消息...");channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);}
}
-------------
package com.day.controller;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;//消费者
public class Consumer1 {//队列的名称public static final String QUEUE_NAME="hello";//接收消息public static void main(String[] args) throws Exception {//信道Channel channel=RMQUtil.getChannel();//声明 接受消息DeliverCallback deliverCallback=(consumerTag,message) ->{try {Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}System.out.println("C1接收到的消息: "+new String(message.getBody(),"UTF-8"));// 1.标记tag 2.不批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//取消消费CancelCallback cancelCallback=consumerTage ->{System.out.println(consumerTage+" 消费消息被中断...");};//消费者消费消息/*** 1.消费哪个队列* 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调** */System.out.println("C1等待接受消息...");channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);}
}
------------如果C0在等待过程中宕机或者发生异常,全部消息有C1处理------------------
C0等待接受消息...
C0接收到的消息: Hello World 0
--------------------
C1等待接受消息...
C1接收到的消息: Hello World 1
C1接收到的消息: Hello World 3
C1接收到的消息: Hello World 5
C1接收到的消息: Hello World 7
C1接收到的消息: Hello World 9
C1接收到的消息: Hello World 2
C1接收到的消息: Hello World 4
C1接收到的消息: Hello World 6
C1接收到的消息: Hello World 8//4.队列RMQ持久化
//(1)channel.queueDeclare(QUEUE_NAME, true,false,false,null);
//(2)持久化后重启RMQ后仍然存在//5.消息持久化
//(1)channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,(message+" "+i).getBytes("UTF-8"));
//(2)但是不是绝对的,如果想绝对要参考后面的发布确认章节//6.不公平分发
//(1)轮训分发—>
//(2)问题在两个消费者就会出现,C1处理快,C0处理慢,而分发任务一致,就会出现能者不多劳。
//(3)由消费者设置分发策略: channel.basicQos(1);
package com.day.controller;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
//生产者,发送大量的消息
public class Producer {//队列名称public static final String QUEUE_NAME="hello";//发消息public static void main(String[] args) throws Exception {//获取信道Channel channel =RMQUtil.getChannel();//创建队列/*** 1.队列名称* 2.队列里面的消息是否持久化 默认情况消息存储在内存中 持久化存储在磁盘* 3.该队列是否只供一个消费者进行消费 是否进行消息共享 true可以多个消费者消费 false则不允许* 4.是否自动删除 最后一个消费者断开后 改队列是否自动删除 true自动删除 false反* 5.其他参数* */channel.queueDeclare(QUEUE_NAME, true,false,false,null);//发消息String message="Hello World";/*** 1.发送到那个交换机* 2.路由的key值 本次是队列的名称* 3.其他参数信息* 4.发送的消息* */for(int i=0;i<10;i++){channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,(message+" "+i).getBytes("UTF-8"));System.out.println((message+" "+i)+" 消息发送完毕...");}}
}
package com.day.controller;
import com.rabbitmq.client.*;
//消费者
public class Consumer {//队列的名称public static final String QUEUE_NAME="hello";//接收消息public static void main(String[] args) throws Exception {//信道Channel channel=RMQUtil.getChannel();//设置分发策略channel.basicQos(1);//声明 接受消息DeliverCallback deliverCallback=(consumerTag,message) ->{try {Thread.sleep(30000);} catch (Exception e) {e.printStackTrace();}System.out.println("C0接收到的消息: "+new String(message.getBody(),"UTF-8"));// 1.标记tag 2.不批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//取消消费CancelCallback cancelCallback=consumerTage ->{System.out.println(consumerTage+" 消费消息被中断...");};//消费者消费消息/*** 1.消费哪个队列* 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调** */System.out.println("C0等待接受消息...");channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);}
}
package com.day.controller;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
//消费者
public class Consumer1 {//队列的名称public static final String QUEUE_NAME="hello";//接收消息public static void main(String[] args) throws Exception {//信道Channel channel=RMQUtil.getChannel();//设置分发策略channel.basicQos(1);//声明 接受消息DeliverCallback deliverCallback=(consumerTag,message) ->{try {Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}System.out.println("C1接收到的消息: "+new String(message.getBody(),"UTF-8"));// 1.标记tag 2.不批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//取消消费CancelCallback cancelCallback=consumerTage ->{System.out.println(consumerTage+" 消费消息被中断...");};//消费者消费消息/*** 1.消费哪个队列* 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调** */System.out.println("C1等待接受消息...");channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);}
}
-------------------控制台信息---------------------------------
C0等待接受消息...
C0接收到的消息: Hello World 0
C1接收到的消息: Hello World 1
C1接收到的消息: Hello World 2
C1接收到的消息: Hello World 3
C1接收到的消息: Hello World 4
C1接收到的消息: Hello World 5
C1接收到的消息: Hello World 6
C1接收到的消息: Hello World 7
C1接收到的消息: Hello World 8
C1接收到的消息: Hello World 9//7.预取值
//(1)就是预先指定C0分到多少,C1分到多少
//(2)也是消费者设置 channel.basicQos(3); channel.basicQos(7);
package com.day.controller;
import com.rabbitmq.client.*;
//消费者
public class Consumer {//队列的名称public static final String QUEUE_NAME="hello";//接收消息public static void main(String[] args) throws Exception {//信道Channel channel=RMQUtil.getChannel();//设置分发策略/预取值channel.basicQos(3);//声明 接受消息DeliverCallback deliverCallback=(consumerTag,message) ->{try {Thread.sleep(30000);} catch (Exception e) {e.printStackTrace();}System.out.println("C0接收到的消息: "+new String(message.getBody(),"UTF-8"));// 1.标记tag 2.不批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//取消消费CancelCallback cancelCallback=consumerTage ->{System.out.println(consumerTage+" 消费消息被中断...");};//消费者消费消息/*** 1.消费哪个队列* 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调** */System.out.println("C0等待接受消息...");channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);}
}
package com.day.controller;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
//消费者
public class Consumer1 {//队列的名称public static final String QUEUE_NAME="hello";//接收消息public static void main(String[] args) throws Exception {//信道Channel channel=RMQUtil.getChannel();//设置分发策略/预取值channel.basicQos(7);//声明 接受消息DeliverCallback deliverCallback=(consumerTag,message) ->{try {Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}System.out.println("C1接收到的消息: "+new String(message.getBody(),"UTF-8"));// 1.标记tag 2.不批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//取消消费CancelCallback cancelCallback=consumerTage ->{System.out.println(consumerTage+" 消费消息被中断...");};//消费者消费消息/*** 1.消费哪个队列* 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调** */System.out.println("C1等待接受消息...");channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);}
}
---控制台信息---
C1等待接受消息...
C1接收到的消息: Hello World 1
C1接收到的消息: Hello World 3
C1接收到的消息: Hello World 5
C1接收到的消息: Hello World 6
C1接收到的消息: Hello World 7
C1接收到的消息: Hello World 8
C1接收到的消息: Hello World 9
---
C0接收到的消息: Hello World 0
C0接收到的消息: Hello World 2
C0接收到的消息: Hello World 4//8.发布确认
//(1)是解决消息不丢失的重要环节
//(2)生产者-发消息-队列hello-
//(3)1设置队列持久化->2设置消息持久化—>3发布确认(这里第3条才能确认消息真的保存在磁盘上了)
//(4)开启发布确认的方法 channel.confirmSelect();
//(5)单个确认发布 批量确认发布 异步确认发布三种方法
//(6)单个确认发布: 发一条确认一条,速度慢
//(7)批量确认发布: 34集
//(8)异步确认发布: 第三种最牛批,性能最厉害
package com.day.controller;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.MessageProperties;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
//生产者,发送大量的消息
public class Producer {//队列名称public static final String QUEUE_NAME="hello";//发消息public static void main(String[] args) throws Exception {//单个确认 耗时31830ms//publicMessageIndividually();//批量确认 耗时1660ms//publicMessageBatch();//异步确认 耗时1102mspublicMessageAsync();}//单个确认public static void publicMessageIndividually() throws Exception{long startTime=System.currentTimeMillis();//获取信道Channel channel =RMQUtil.getChannel();//开启发布确认channel.confirmSelect();//创建队列/*** 1.队列名称* 2.队列里面的消息是否持久化 默认情况消息存储在内存中 持久化存储在磁盘* 3.该队列是否只供一个消费者进行消费 是否进行消息共享 true可以多个消费者消费 false则不允许* 4.是否自动删除 最后一个消费者断开后 改队列是否自动删除 true自动删除 false反* 5.其他参数* */channel.queueDeclare(QUEUE_NAME, true,false,false,null);//发消息String message="Hello World";/*** 1.发送到那个交换机* 2.路由的key值 本次是队列的名称* 3.其他参数信息* 4.发送的消息* */for(int i=0;i<1000;i++){channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,(message+" "+i).getBytes("UTF-8"));System.out.println((message+" "+i)+" 消息发送完毕...");//进行发布确认boolean flag=channel.waitForConfirms();if(flag){System.out.println("确认发送成功...");}else{System.err.println("不确认发送成功...");}}long endTime=System.currentTimeMillis();System.out.println("耗时"+(endTime-startTime)+"ms");}//批量发布确认public static void publicMessageBatch() throws Exception{long startTime=System.currentTimeMillis();//获取信道Channel channel =RMQUtil.getChannel();//开启发布确认channel.confirmSelect();//创建队列channel.queueDeclare(QUEUE_NAME, true,false,false,null);//发消息//批量确认消息大小int batchSize=100;String message="Hello World";for(int i=0;i<1000;i++){channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,(message+" "+i).getBytes("UTF-8"));System.out.println((message+" "+i)+" 消息发送完毕...");//判断达到100条,批量确认一次if(i%batchSize==0){boolean flag=channel.waitForConfirms();if(flag){System.out.println("确认100条发送成功...");}else{System.err.println("不确认100条发送成功...");}}}long endTime=System.currentTimeMillis();System.out.println("耗时"+(endTime-startTime)+"ms");}//异步确认发布//生产消息的时候一一编号,也叫给ID或者提取特征,有无问题broker会通知你//速度最快,效率最高,实用性最大public static void publicMessageAsync() throws Exception{long startTime=System.currentTimeMillis();//线程安全有序的一个哈希表 适用于高并发的情况下ConcurrentSkipListMap<Long,Object> concurrentSkipListMap=new ConcurrentSkipListMap<>();//获取信道Channel channel =RMQUtil.getChannel();//开启发布确认channel.confirmSelect();//消息监听器,监听消息是否发送成功了//确认成功的回调函数ConfirmCallback ackCallBack=(deliveryTag,multiple) ->{System.out.println("确认发送成功的消息 "+deliveryTag);//2.删除已经确认的消息//是否批量确认if(multiple){ConcurrentNavigableMap<Long,Object> concurrentNavigableMap=concurrentSkipListMap.headMap(deliveryTag);System.out.println("即将删除 "+concurrentNavigableMap);concurrentNavigableMap.clear();}else{System.out.println("即将删除 "+concurrentSkipListMap.get(deliveryTag));concurrentSkipListMap.remove(deliveryTag);}};//确认失败的回调函数ConfirmCallback nackCallBack=(deliveryTag,multiple) ->{System.err.println("不能确认发送成功的消息 "+deliveryTag);//打印一下未确认的消息String temp= (String) concurrentSkipListMap.get(deliveryTag);System.out.println("不能确认发送成功的消息 "+temp);};channel.addConfirmListener(ackCallBack,nackCallBack);//创建队列channel.queueDeclare(QUEUE_NAME, true,false,false,null);String message="异步确认... Hello World";for(int i=0;i<1000;i++){//1.此处记录所有要发送的消息concurrentSkipListMap.put(channel.getNextPublishSeqNo()-2,(message+" "+i));channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,(message+" "+i).getBytes("UTF-8"));//System.out.println(channel.getNextPublishSeqNo());}Thread.sleep(5000);System.err.println(concurrentSkipListMap.size());long endTime=System.currentTimeMillis();System.out.println("耗时"+(endTime-startTime)+"ms");}
}

三.发布订阅

//1.交换机
//(1)生产者-发消息-队列-消费者
//(2)原来的消息只能被消费一次,如果做到1个消息被多个消费者消费呢?
//(3)生产者-交换机-RoutingKey-队列(仍是消息只能被消费一次)-消费者
//          -交换机-RoutingKey-队列(仍是消息只能被消费一次)-消费者...
//(4)RMQ把消息给队列,必须走交换机,不指定是默认的交换机。
//(5)交换机的类型:直接direct 主题topic 标题headers 扇出fanout 无名""
//(6)绑定bindings 用RoutingKey可以区分队列//2.Fanout 广播 发布订阅模式 根源是发把消息发给交换机,然后用RoutingKey关联到多个队列,给多个队列同时发消息
//(1)P-X-n个Q-n个消费者
package com.day.controller;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.util.Scanner;
//生产者,发送大量的消息
public class Producer {//交换机名public static final String EXCHANGE_NAME="logs";//发消息public static void main(String[] args) throws Exception {Channel channel =RMQUtil.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,"fanout");Scanner scanner=new Scanner(System.in);while(scanner.hasNext()){String message=scanner.next();channel.basicPublish(EXCHANGE_NAME,"", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));System.out.println("生产者发出消息: "+message);}}
}
--------------
package com.day.controller;
import com.rabbitmq.client.*;
//消费者
public class Consumer {//队列的名称public static final String QUEUE_NAME="public0";//交换机名public static final String EXCHANGE_NAME="logs";//接收消息public static void main(String[] args) throws Exception {//信道Channel channel=RMQUtil.getChannel();//队列,生成临时队列,队列名称是随机的channel.queueDeclare(QUEUE_NAME,true,false,false,null);//绑定交换机与队列channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");System.out.println("C0等待,把接受到的消息打印在屏幕上...");//声明 接受消息DeliverCallback deliverCallback=(consumerTag,message) ->{System.out.println("C0接收到的消息: "+new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback=consumerTage ->{System.out.println(consumerTage+" 消费消息被中断...");};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}
--------------------------------
package com.day.controller;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
//消费者
public class Consumer1 {//队列的名称public static final String QUEUE_NAME="public1";public static final String EXCHANGE_NAME="logs";//接收消息public static void main(String[] args) throws Exception {//信道Channel channel=RMQUtil.getChannel();//队列,生成临时队列,队列名称是随机的channel.queueDeclare(QUEUE_NAME,true,false,false,null);//绑定交换机与队列channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");System.out.println("C1等待,把接受到的消息打印在屏幕上...");//声明 接受消息DeliverCallback deliverCallback=(consumerTag,message) ->{System.out.println("C1接收到的消息: "+new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback=consumerTage ->{System.out.println(consumerTage+" 消费消息被中断...");};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}//3.Direct Exchange直接交换机
//(1)路由模式,想给谁传给谁传,根据RoutingKey不同指定轻松实现。绑定相同的RoutingKey就是fanout模式了
package com.day.controller;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.util.Scanner;
//生产者,发送大量的消息
public class Producer {//交换机名public static final String EXCHANGE_NAME="direct_logs";public static final String CONSOLE_INFO="info";public static final String CONSOLE_WARNING="warning";public static final String DISK_ERROR="error";//发消息public static void main(String[] args) throws Exception {Channel channel =RMQUtil.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);Scanner scanner=new Scanner(System.in);System.out.println("生产者准备从控制台获取信息发送...");while(scanner.hasNext()){String message=scanner.next();channel.basicPublish(EXCHANGE_NAME,DISK_ERROR, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));System.out.println("生产者发出消息: "+message);}}
}
----------
package com.day.controller;
import com.rabbitmq.client.*;
//消费者
public class Consumer {//队列的名称public static final String QUEUE_NAME="console";//交换机名public static final String EXCHANGE_NAME="direct_logs";//接收消息public static void main(String[] args) throws Exception {//信道Channel channel=RMQUtil.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);//队列,生成临时队列,队列名称是随机的channel.queueDeclare(QUEUE_NAME,true,false,false,null);//绑定交换机与队列channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");System.out.println("C0等待,把接受到的消息打印在屏幕上...");//声明 接受消息DeliverCallback deliverCallback=(consumerTag,message) ->{System.out.println("C0接收到的消息: "+new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback=consumerTage ->{System.out.println(consumerTage+" 消费消息被中断...");};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}
----------
package com.day.controller;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
//消费者
public class Consumer1 {//队列的名称public static final String QUEUE_NAME="disk";public static final String EXCHANGE_NAME="direct_logs";//接收消息public static void main(String[] args) throws Exception {//信道Channel channel=RMQUtil.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//队列,生成临时队列,队列名称是随机的channel.queueDeclare(QUEUE_NAME,true,false,false,null);//绑定交换机与队列channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");System.out.println("C1等待,把接受到的消息打印在屏幕上...");//声明 接受消息DeliverCallback deliverCallback=(consumerTag,message) ->{System.out.println("C1接收到的消息: "+new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback=consumerTage ->{System.out.println(consumerTage+" 消费消息被中断...");};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}//4.Topics 主题交换机
//(1)P-X-RoutingKey-
//(2)主题交换机的RoutingKey不能随意写,必须是.隔开的单词列表;*代表一个单词,#代表0或多个单词
//(3)支持匹配模式,*.orange.* lazy.# *.*.rabbit
//(4)如果绑定# 则这个对垒将接受所有数据,有点像fanout
//(5)如果没有# *就是direct交换机,这不就是多个正则吗
package com.day.controller;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
//生产者,发送大量的消息
public class Producer {//交换机名public static final String EXCHANGE_NAME="topic_logs";//发消息public static void main(String[] args) throws Exception {Channel channel =RMQUtil.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);Map<String,String> bindingKeyMap=new HashMap<>();bindingKeyMap.put("quick.orange.rabbit","被队列Q1Q2接收到");bindingKeyMap.put("lazy.orange.elephant","被队列Q1Q2接收到");bindingKeyMap.put("quick.orange.fox","被队列Q1接收到");bindingKeyMap.put("lazy.brown.fox","被队列Q2接收到");bindingKeyMap.put("lazy.pink.rabbit","Q2接收一次");bindingKeyMap.put("quick.brown.fox","被丢弃");bindingKeyMap.put("quick.orange.male.rabbit","被丢弃");bindingKeyMap.put("lazy.orange.male.rabbit","Q2");for(String key:bindingKeyMap.keySet()){String message=bindingKeyMap.get(key);//队列名称是通过key指定的哦channel.basicPublish(EXCHANGE_NAME,key,null,message.getBytes("UTF-8"));System.out.println("生产者发出消息: "+message);}}
}
------
package com.day.controller;
import com.rabbitmq.client.*;
//消费者
public class Consumer {//队列的名称public static final String QUEUE_NAME="Q1";//交换机名public static final String EXCHANGE_NAME="topic_logs";//接收消息public static void main(String[] args) throws Exception {//信道Channel channel=RMQUtil.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);//队列,生成临时队列,队列名称是随机的channel.queueDeclare(QUEUE_NAME,true,false,false,null);//绑定交换机与队列channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*");System.out.println("C0等待,把接受到的消息打印在屏幕上...");//接受消息DeliverCallback deliverCallback=(consumerTag,message) ->{System.out.println("C0接收到的消息: "+new String(message.getBody(),"UTF-8")+"。通过队列 "+QUEUE_NAME+"。绑定键: "+message.getEnvelope().getRoutingKey());};CancelCallback cancelCallback=consumerTage ->{System.out.println(consumerTage+" 消费消息被中断...");};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}
------
package com.day.controller;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
//消费者
public class Consumer1 {//队列的名称public static final String QUEUE_NAME="Q2";public static final String EXCHANGE_NAME="topic_logs";//接收消息public static void main(String[] args) throws Exception {//信道Channel channel=RMQUtil.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);//队列,生成临时队列,队列名称是随机的channel.queueDeclare(QUEUE_NAME,true,false,false,null);//绑定交换机与队列channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.*.rabbit");channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"lazy.#");System.out.println("C1等待,把接受到的消息打印在屏幕上...");//接受消息DeliverCallback deliverCallback=(consumerTag,message) ->{System.out.println("C1接收到的消息: "+new String(message.getBody(),"UTF-8")+"。通过队列 "+QUEUE_NAME+"。绑定键: "+message.getEnvelope().getRoutingKey());};CancelCallback cancelCallback=consumerTage ->{System.out.println(consumerTage+" 消费消息被中断...");};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}//5.死信
//(1)消息拒绝 效果过期 队列达到最大长度
//(2)开启消费者0,然后关闭,然后运行生产者发消息,会发现消息在normal_queue,10S后在dead_queue的变化
//(3)无法被消费的消息,由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信
//(4)死信队列机制,消费时发生异常,将消息放到死信队列中,防止消息的丢失
//(5)死信的来源:消息TTL过期、队列达到最大长度、消息被拒绝并且requeue=false;
package com.day.controller;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.util.HashMap;
import java.util.Map;
//生产者,发送大量的消息
public class Producer {//交换机名public static final String NORMAL_EXCHANGE="normal_exchange";//发消息public static void main(String[] args) throws Exception {Channel channel =RMQUtil.getChannel();//单位是ms 即设置10SAMQP.BasicProperties basicProperties=new AMQP.BasicProperties().builder().expiration("10000").build();for(int i=0;i<10;i++){String message="生产者发送的消息: "+i;//TTL -TIME TO LIVEchannel.basicPublish(NORMAL_EXCHANGE,"normalBindLine",basicProperties,message.getBytes("UTF-8"));System.out.println("生产者发出消息: "+message);}}
}
------
package com.day.controller;
import com.rabbitmq.client.*;
import java.util.HashMap;
import java.util.Map;
//消费者
public class Consumer {//普通交换机名public static final String NORMAL_EXCHANGE="normal_exchange";//死信交换机public static final String DEAD_EXCHANGE="dead_exchange";//两个队列public static final String NORMAL_QUEUE="normal_queue";public static final String DEAD_QUEUE="dead_queue";//接收消息public static void main(String[] args) throws Exception {//信道Channel channel=RMQUtil.getChannel();//-----------------------------------//声明交换机channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);//-----------------------------------//声明队列-要使用特殊的参数才能转发到死信队列Map<String,Object> argumentsMap=new HashMap<>();//设置转发的交换机//argumentsMap.put("x-message-ttl",10000);//也可以在发消息时指定argumentsMap.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置RoutingKeyargumentsMap.put("x-dead-letter-routing-key","deadBindLine");channel.queueDeclare(NORMAL_QUEUE,true,false,false,argumentsMap);//死信队列channel.queueDeclare(DEAD_QUEUE,true,false,false,null);//-----------------------------------//绑定交换机与队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normalBindLine");channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"deadBindLine");//-----------------------------------System.out.println("C0等待,把接受到的消息打印在屏幕上...");//接受消息DeliverCallback deliverCallback=(consumerTag,message) ->{System.out.println("C0接收到的消息: "+new String(message.getBody(),"UTF-8")+"。通过队列 "+NORMAL_QUEUE+"。绑定键: "+message.getEnvelope().getRoutingKey());};CancelCallback cancelCallback=consumerTage ->{System.out.println(consumerTage+" 消费消息被中断...");};channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);}
}
------
package com.day.controller;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
//消费者
public class Consumer1 {public static final String DEAD_QUEUE="dead_queue";public static void main(String[] args) throws Exception {//信道Channel channel=RMQUtil.getChannel();System.out.println("C1等待,把接受到的消息打印在屏幕上...");//接受消息DeliverCallback deliverCallback=(consumerTag,message) ->{System.out.println("C1接收到的消息: "+new String(message.getBody(),"UTF-8")+"。通过队列 "+DEAD_QUEUE+"。绑定键: "+message.getEnvelope().getRoutingKey());};CancelCallback cancelCallback=consumerTag ->{System.out.println(consumerTag+" 消费消息被中断...");};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);}
}//5.死信
//(1)队列达到最大长度
package com.day.controller;
import com.rabbitmq.client.Channel;
//生产者,发送大量的消息
public class Producer {//交换机名public static final String NORMAL_EXCHANGE="normal_exchange";//发消息public static void main(String[] args) throws Exception {Channel channel =RMQUtil.getChannel();//单位是ms 即设置10S/* AMQP.BasicProperties basicProperties=new AMQP.BasicProperties().builder().expiration("10000").build();*/for(int i=0;i<10;i++){String message="生产者发送的消息: "+i;channel.basicPublish(NORMAL_EXCHANGE,"normalBindLine",null,message.getBytes("UTF-8"));System.out.println("生产者发出消息: "+message);}}
}
------
package com.day.controller;
import com.rabbitmq.client.*;
import java.util.HashMap;
import java.util.Map;
//消费者
public class Consumer {//普通交换机名public static final String NORMAL_EXCHANGE="normal_exchange";//死信交换机public static final String DEAD_EXCHANGE="dead_exchange";//两个队列public static final String NORMAL_QUEUE="normal_queue";public static final String DEAD_QUEUE="dead_queue";//接收消息public static void main(String[] args) throws Exception {//信道Channel channel=RMQUtil.getChannel();//-----------------------------------//声明交换机channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);//-----------------------------------//声明队列-要使用特殊的参数才能转发到死信队列Map<String,Object> argumentsMap=new HashMap<>();//设置转发的交换机//argumentsMap.put("x-message-ttl",10000);//也可以在发消息时指定argumentsMap.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置RoutingKeyargumentsMap.put("x-dead-letter-routing-key","deadBindLine");argumentsMap.put("x-max-length",6);channel.queueDeclare(NORMAL_QUEUE,true,false,false,argumentsMap);//死信队列channel.queueDeclare(DEAD_QUEUE,true,false,false,null);//-----------------------------------//绑定交换机与队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normalBindLine");channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"deadBindLine");//-----------------------------------System.out.println("C0等待,把接受到的消息打印在屏幕上...");//接受消息DeliverCallback deliverCallback=(consumerTag,message) ->{System.out.println("C0接收到的消息: "+new String(message.getBody(),"UTF-8")+"。通过队列 "+NORMAL_QUEUE+"。绑定键: "+message.getEnvelope().getRoutingKey());};CancelCallback cancelCallback=consumerTage ->{System.out.println(consumerTage+" 消费消息被中断...");};channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);}
}
------
package com.day.controller;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
//消费者
public class Consumer1 {public static final String DEAD_QUEUE="dead_queue";public static void main(String[] args) throws Exception {//信道Channel channel=RMQUtil.getChannel();System.out.println("C1等待,把接受到的消息打印在屏幕上...");//接受消息DeliverCallback deliverCallback=(consumerTag,message) ->{System.out.println("C1接收到的消息: "+new String(message.getBody(),"UTF-8")+"。通过队列 "+DEAD_QUEUE+"。绑定键: "+message.getEnvelope().getRoutingKey());};CancelCallback cancelCallback=consumerTag ->{System.out.println(consumerTag+" 消费消息被中断...");};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);}
}//5.死信
//(1)消息被拒绝
package com.day.controller;
import com.rabbitmq.client.Channel;
//生产者,发送大量的消息
public class Producer {//交换机名public static final String NORMAL_EXCHANGE="normal_exchange";//发消息public static void main(String[] args) throws Exception {Channel channel =RMQUtil.getChannel();//单位是ms 即设置10S/* AMQP.BasicProperties basicProperties=new AMQP.BasicProperties().builder().expiration("10000").build();*/for(int i=0;i<10;i++){String message="生产者发送的消息: "+i;channel.basicPublish(NORMAL_EXCHANGE,"normalBindLine",null,message.getBytes("UTF-8"));System.out.println("生产者发出消息: "+message);}}
}
------
package com.day.controller;
import com.rabbitmq.client.*;
import java.util.HashMap;
import java.util.Map;
//消费者
public class Consumer {//普通交换机名public static final String NORMAL_EXCHANGE="normal_exchange";//死信交换机public static final String DEAD_EXCHANGE="dead_exchange";//两个队列public static final String NORMAL_QUEUE="normal_queue";public static final String DEAD_QUEUE="dead_queue";//接收消息public static void main(String[] args) throws Exception {//信道Channel channel=RMQUtil.getChannel();//-----------------------------------//声明交换机channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);//-----------------------------------//声明队列-要使用特殊的参数才能转发到死信队列Map<String,Object> argumentsMap=new HashMap<>();//设置转发的交换机//argumentsMap.put("x-message-ttl",10000);//也可以在发消息时指定argumentsMap.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置RoutingKeyargumentsMap.put("x-dead-letter-routing-key","deadBindLine");channel.queueDeclare(NORMAL_QUEUE,true,false,false,argumentsMap);//死信队列channel.queueDeclare(DEAD_QUEUE,true,false,false,null);//-----------------------------------//绑定交换机与队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normalBindLine");channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"deadBindLine");//-----------------------------------System.out.println("C0等待,把接受到的消息打印在屏幕上...");//接受消息DeliverCallback deliverCallback=(consumerTag,message) ->{String msg=new String(message.getBody(),"UTF-8");if(msg.contains("5")){//拒绝且不放回原队列channel.basicReject(message.getEnvelope().getDeliveryTag(),false);}else{System.out.println("C0接收到的消息: "+new String(message.getBody(),"UTF-8")+"。通过队列 "+NORMAL_QUEUE+"。绑定键: "+message.getEnvelope().getRoutingKey());channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}};CancelCallback cancelCallback=consumerTag ->{System.out.println(consumerTag+" 消费消息被中断...");};//必须开启手动应答channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,cancelCallback);}
}
------
package com.day.controller;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
//消费者
public class Consumer1 {public static final String DEAD_QUEUE="dead_queue";public static void main(String[] args) throws Exception {//信道Channel channel=RMQUtil.getChannel();System.out.println("C1等待,把接受到的消息打印在屏幕上...");//接受消息DeliverCallback deliverCallback=(consumerTag,message) ->{System.out.println("C1接收到的消息: "+new String(message.getBody(),"UTF-8")+"。通过队列 "+DEAD_QUEUE+"。绑定键: "+message.getEnvelope().getRoutingKey());};CancelCallback cancelCallback=consumerTag ->{System.out.println(consumerTag+" 消费消息被中断...");};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);}
}
//56集

四.Spinrgboot整合RMQ

//1.延迟队列
//(1)延迟队列本质上就是TTL过期的死信队列
//(2)P-X(普通交换机)-Y(延迟交换机)-3个队列,实现两种不同的延迟效果10S和40S
//(3)花费了我2个小时因为Consumer类中的Channel导错了包,应该为import com.rabbitmq.client.Channel;
//(4)学会了lombok.extern.slf4j.Slf4j与yml文件配置log.info的使用
//(5)见识了Springboot注解配置的强大。只要功夫深,问题能解决
------POM.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.5.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>SpringBoot</groupId><artifactId>springboot-maven</artifactId><version>0.0.1-SNAPSHOT</version><name>springboot-maven</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><distributionManagement><repository><id>releases</id><name>Nexus Release Repository</name><url>http://wdfgdzx.top:8081/nexus/content/repositories/releases/</url></repository><snapshotRepository><id>snapshots</id><name>Nexus Snapshot Repository</name><url>http://wdfgdzx.top:8081/nexus/content/repositories/snapshots/</url></snapshotRepository></distributionManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId></dependency><!--整合MyBatis--><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.1.0</version></dependency><!--数据库连接池--><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.12</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>6.0.6</version></dependency><!--redis依赖包--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!--   <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId></dependency>--><!-- Thymeleaf 自动配置 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId></dependency><!-- 允许使用非严格的 HTML 语法 --><dependency><groupId>net.sourceforge.nekohtml</groupId><artifactId>nekohtml</artifactId><version>1.9.22</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-test</artifactId></dependency><!--SpringBoot热部署配置 --><!--  <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency>--><dependency><groupId>org.jetbrains</groupId><artifactId>annotations</artifactId><version>13.0</version><scope>compile</scope></dependency><!--json--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.46</version></dependency><dependency><groupId>commons-codec</groupId><artifactId>commons-codec</artifactId><version>1.9</version></dependency><dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId><version>3.9.1</version></dependency><dependency><groupId>com.squareup.okio</groupId><artifactId>okio</artifactId><version>1.15.0</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.6</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpcore</artifactId><version>4.4.10</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpmime</artifactId><version>4.5.6</version></dependency><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.6</version></dependency><!-- https://mvnrepository.com/artifact/com.google.code.gson/gson --><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.6.2</version></dependency><dependency><groupId>net.minidev</groupId><artifactId>json-smart</artifactId></dependency><!--Rich文本开始--><dependency><groupId>com.gitee.qdbp.thirdparty</groupId><artifactId>ueditor</artifactId><version>1.4.3.3</version></dependency><!-- https://mvnrepository.com/artifact/org.json/json --><dependency><groupId>org.json</groupId><artifactId>json</artifactId><version>20160810</version></dependency><!-- https://mvnrepository.com/artifact/commons-io/commons-io --><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.4</version></dependency><!-- https://mvnrepository.com/artifact/commons-fileupload/commons-fileupload --><dependency><groupId>commons-fileupload</groupId><artifactId>commons-fileupload</artifactId><version>1.3.1</version></dependency><!-- https://mvnrepository.com/artifact/commons-codec/commons-codec --><dependency><groupId>commons-codec</groupId><artifactId>commons-codec</artifactId><version>1.9</version></dependency><!--Rich文本结束--><!-- 读取Excel --><dependency><groupId>org.apache.poi</groupId><artifactId>poi-ooxml</artifactId><version>4.1.2</version></dependency><!--顺丰本地jar包放置与引入--><dependency><groupId>com.iflytek.msp.sfexpress</groupId><artifactId>express-sdk</artifactId><version>2.1.5</version><scope>system</scope><systemPath>${project.basedir}/src/main/resources/libs/sf-csim-express-sdk-V2.1.5.jar</systemPath></dependency><!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency><!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger2 --><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies><build><plugins><!-- <plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.1.5.RELEASE</version></plugin>--><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><includeSystemScope>true</includeSystemScope></configuration><version>2.1.5.RELEASE</version></plugin><!-- <plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><fork>true</fork><addResources>true</addResources></configuration></plugin>--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build>
</project>
------yml
server:port: 8001tomcat:accesslog:buffered: truedirectory: /home/A/SpringBootenabled: truefile-date-format: .yyyy-MM-ddpattern: commonprefix: access_logrename-on-rotate: falserequest-attributes-enabled: falserotate: truesuffix: .log
spring:#devtools:#restart:#enabled=true: #支持热部署  可能导致重启,然后非实时语音转写报错。rabbitmq:host: wdfgdzx.topport: 5672username: xlliu24password: s19911009!redis: #配置redishost: wdfgdzx.topprot: 6379datasource:name: mydbtype: com.alibaba.druid.pool.DruidDataSourceurl: jdbc:mysql://wdfgdzx.top:3306/mydb?serverTimezone=GMT%2b8username: rootpassword: s19911009!driver-class-name: com.mysql.cj.jdbc.Driverthymeleaf:prefix: classpath:/site/check-template-location: true  #check-tempate-location: 检查模板路径是否存在enabled: trueencoding: UTF-8content-type: text/htmlcache: falsemode: HTMLsuffix: .htmlservlet:multipart: #配置文件上传max-file-size: 1000MB #设置上传的单个文件最大值,单位可以是 MB、KB,默认为 1MBmax-request-size: 1024MB #设置多文件上传时,单次内多个文件的总量的最大值,单位可以是 MB、KB,默认为 10 M
mybatis:mapper-locations: classpath*:/mybatis/*Mapper.xml
logging:level:root: info
-------交换机声明、队列声明、绑定
package com.day.controller;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RMQConfig {//普通交换机private static final String COMMON_EXCHANGE="X";//死信交换机private static final String DEAD_EXCHANGE="Y";//普通队列private static final String COMMON_QUEUE_A="QA";private static final String COMMON_QUEUE_B="QB";//死信队列private static final String DEAD_QUEUE_D="QD";//----------------------------------//声明交换@Bean("commonExchange")public DirectExchange commonExchange(){return new DirectExchange(COMMON_EXCHANGE);}@Bean("deadExchange")public DirectExchange deadExchange(){return new DirectExchange(DEAD_EXCHANGE);}//----------------------------------//声明队列@Bean("commonQueueA")public Queue commonQueueA(){Map<String,Object> arguments=new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信Routing Keyarguments.put("x-dead-letter-routing-key","YD");//设置TTLarguments.put("x-message-ttl",10000);return QueueBuilder.durable(COMMON_QUEUE_A).withArguments(arguments).build();}@Bean("commonQueueB")public Queue commonQueueB(){Map<String,Object> arguments=new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信Routing Keyarguments.put("x-dead-letter-routing-key","YD");//设置TTLarguments.put("x-message-ttl",40000);return QueueBuilder.durable(COMMON_QUEUE_B).withArguments(arguments).build();}@Bean("deadQueueD")public Queue deadQueueD(){return QueueBuilder.durable(DEAD_QUEUE_D).build();}//----------------------------------//绑定@Beanpublic Binding commonQueueABindingCommonExchange(@Qualifier("commonQueueA") Queue commonQueueA,@Qualifier("commonExchange") DirectExchange commonExchange){return BindingBuilder.bind(commonQueueA).to(commonExchange).with("XA");}@Beanpublic Binding commonQueueBBindingCommonExchange(@Qualifier("commonQueueB") Queue commonQueueB,@Qualifier("commonExchange") DirectExchange commonExchange){return BindingBuilder.bind(commonQueueB).to(commonExchange).with("XB");}@Beanpublic Binding deadQueueDBindingDeadExchange(@Qualifier("deadQueueD") Queue deadQueueD,@Qualifier("deadExchange") DirectExchange deadExchange){return BindingBuilder.bind(deadQueueD).to(deadExchange).with("YD");}
}
------生产者 http://localhost:8001/ttl/sendMsg/Hello
package com.day.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.Serializable;
import java.util.Date;
//发送延迟消息
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController implements Serializable{@Autowiredprivate RabbitTemplate rabbitTemplate;//开始发消息@GetMapping("/sendMsg/{message}")public void sendMsg(@PathVariable String message){//System.out.println("当前时间: "+new Date().toString()+"发送一条信息给两个TTL队列 "+message);log.info("当前时间: {},发送一条信息给两个TTL队列:{}",new Date().toString(),message);rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列: "+message);rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列: "+message);}
}
-----消费者
package com.day.controller;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.Date;
//消费者
@Slf4j
@Component
public class Consumer implements Serializable {@RabbitListener(queues="QD")public void receiveD(Message message, Channel channel) throws Exception{byte[] body=message.getBody();String msg=new String(body, Charset.forName("UTF-8"));log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);}
}
------SwaggerConfig 非必须
package com.day.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
public class SwaggerConfig {@Beanpublic Docket webApiConfig(){return new Docket(DocumentationType.SWAGGER_2).groupName("webApi").apiInfo(webApiInfo()).select().build();}private ApiInfo webApiInfo(){return new ApiInfoBuilder().title("RMQ接口文档").description("本文描述了RMQ的微服务接口定义").version("1.0").contact(new Contact("wdfgdzx","http://wdfgdzx.top","wdfgdzx@163.com")).build();}
}

image.png

//2.延迟队列的优化
package com.day.controller;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RMQConfig {//普通交换机private static final String COMMON_EXCHANGE="X";//死信交换机private static final String DEAD_EXCHANGE="Y";//普通队列private static final String COMMON_QUEUE_A="QA";private static final String COMMON_QUEUE_B="QB";//延迟队列优化private static final String COMMON_QUEUE_C="QC";//死信队列private static final String DEAD_QUEUE_D="QD";//----------------------------------//声明交换@Bean("commonExchange")public DirectExchange commonExchange(){return new DirectExchange(COMMON_EXCHANGE);}@Bean("deadExchange")public DirectExchange deadExchange(){return new DirectExchange(DEAD_EXCHANGE);}//----------------------------------//声明队列@Bean("commonQueueA")public Queue commonQueueA(){Map<String,Object> arguments=new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信Routing Keyarguments.put("x-dead-letter-routing-key","YD");//设置TTLarguments.put("x-message-ttl",10000);return QueueBuilder.durable(COMMON_QUEUE_A).withArguments(arguments).build();}//绑定@Beanpublic Binding commonQueueABindingCommonExchange(@Qualifier("commonQueueA") Queue commonQueueA,@Qualifier("commonExchange") DirectExchange commonExchange){return BindingBuilder.bind(commonQueueA).to(commonExchange).with("XA");}//声明队列@Bean("commonQueueB")public Queue commonQueueB(){Map<String,Object> arguments=new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信Routing Keyarguments.put("x-dead-letter-routing-key","YD");//设置TTLarguments.put("x-message-ttl",40000);return QueueBuilder.durable(COMMON_QUEUE_B).withArguments(arguments).build();}//绑定@Beanpublic Binding commonQueueBBindingCommonExchange(@Qualifier("commonQueueB") Queue commonQueueB,@Qualifier("commonExchange") DirectExchange commonExchange){return BindingBuilder.bind(commonQueueB).to(commonExchange).with("XB");}//声明队列@Bean("commonQueueC")public Queue commonQueueC(){Map<String,Object> arguments=new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信Routing Keyarguments.put("x-dead-letter-routing-key","YD");return QueueBuilder.durable(COMMON_QUEUE_C).withArguments(arguments).build();}//绑定@Beanpublic Binding commonQueueCBindingCommonExchange(@Qualifier("commonQueueC") Queue commonQueueC,@Qualifier("commonExchange") DirectExchange commonExchange){return BindingBuilder.bind(commonQueueC).to(commonExchange).with("XC");}//声明队列@Bean("deadQueueD")public Queue deadQueueD(){return QueueBuilder.durable(DEAD_QUEUE_D).build();}//绑定@Beanpublic Binding deadQueueDBindingDeadExchange(@Qualifier("deadQueueD") Queue deadQueueD,@Qualifier("deadExchange") DirectExchange deadExchange){return BindingBuilder.bind(deadQueueD).to(deadExchange).with("YD");}
}
-----------------------------------
package com.day.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.io.Serializable;
import java.util.Date;
//发送延迟消息
//打印日志的注解
@Slf4j
@RestController
@RequestMapping("/ttl")
public class ProducerController implements Serializable{@Resourceprivate RabbitTemplate rabbitTemplate;//开始发消息@GetMapping("/sendMsg/{message}")public void sendMsg(@PathVariable String message){//System.out.println("当前时间: "+new Date().toString()+"发送一条信息给两个TTL队列 "+message);log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date(),message);rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列: "+message);rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列: "+message);}//开始发消息 消息 TTL@GetMapping("/sendExpireMsg/{message}/{ttlTime}")public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){log.info("当前时间:{},发送一条时长{}ms信息给TTL队列QC:{}",new Date(),ttlTime,message);rabbitTemplate.convertAndSend("X","XC",message,msg ->{msg.getMessageProperties().setExpiration(ttlTime);return msg;});}
}
------------------------------------------
package com.day.controller;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.Date;
//消费者
@Slf4j
@Component
public class Consumer implements Serializable {@RabbitListener(queues="QD")public void receiveD(Message message, Channel channel) throws Exception{byte[] body=message.getBody();String msg=new String(body, Charset.forName("UTF-8"));log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);}
}

image.png

1635686907259023471.png

//3.日志文件配置与使用
配置-------------
<?xml version="1.0" encoding="UTF-8"?>
<!-- 日志级别从低到高分为TRACE < DEBUG < INFO < WARN < ERROR < FATAL,如果设置为WARN,则低于WARN的信息都不会输出 -->
<!-- scan:当此属性设置为true时,配置文档如果发生改变,将会被重新加载,默认值为true -->
<!-- scanPeriod:设置监测配置文档是否有修改的时间间隔,如果没有给出时间单位,默认单位是毫秒。当scan为true时,此属性生效。默认的时间间隔为1分钟。 -->
<!-- debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。默认值为false。 -->
<configuration  scan="true" scanPeriod="10 seconds"><contextName>logback-spring</contextName><!-- name的值是变量的名称,value的值时变量定义的值。通过定义的值会被插入到logger上下文中。定义后,可以使“${}”来使用变量。 --><property name="logging.path" value="src/main/resources/static/client" /><!--0. 日志格式和颜色渲染 --><!-- 彩色日志依赖的渲染类 --><conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" /><conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" /><conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" /><!-- 彩色日志格式 --><property name="CONSOLE_LOG_PATTERN" value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/><!--1. 输出到控制台--><appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"><!--此日志appender是为开发使用,只配置最底级别,控制台输出的日志级别是大于或等于此级别的日志信息--><filter class="ch.qos.logback.classic.filter.ThresholdFilter"><level>debug</level></filter><encoder><Pattern>${CONSOLE_LOG_PATTERN}</Pattern><!-- 设置字符集 --><charset>UTF-8</charset></encoder></appender><!--2. 输出到文档--><!-- 2.1 level为 DEBUG 日志,时间滚动输出  --><appender name="DEBUG_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"><!-- 正在记录的日志文档的路径及文档名 --><file>${logging.path}/web_debug.log</file><!--日志文档输出格式--><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern><charset>UTF-8</charset> <!-- 设置字符集 --></encoder><!-- 日志记录器的滚动策略,按日期,按大小记录 --><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><!-- 日志归档 --><fileNamePattern>${logging.path}/web-debug-%d{yyyy-MM-dd}.%i.log</fileNamePattern><timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"><maxFileSize>100MB</maxFileSize></timeBasedFileNamingAndTriggeringPolicy><!--日志文档保留天数--><maxHistory>15</maxHistory></rollingPolicy><!-- 此日志文档只记录debug级别的 --><filter class="ch.qos.logback.classic.filter.LevelFilter"><level>debug</level><onMatch>ACCEPT</onMatch><onMismatch>DENY</onMismatch></filter></appender><!-- 2.2 level为 INFO 日志,时间滚动输出  --><appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"><!-- 正在记录的日志文档的路径及文档名 --><file>${logging.path}/web_info.log</file><!--日志文档输出格式--><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern><charset>UTF-8</charset></encoder><!-- 日志记录器的滚动策略,按日期,按大小记录 --><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><!-- 每天日志归档路径以及格式 --><fileNamePattern>${logging.path}/web-info-%d{yyyy-MM-dd}.%i.log</fileNamePattern><timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"><maxFileSize>100MB</maxFileSize></timeBasedFileNamingAndTriggeringPolicy><!--日志文档保留天数--><maxHistory>15</maxHistory></rollingPolicy><!-- 此日志文档只记录info级别的 --><filter class="ch.qos.logback.classic.filter.LevelFilter"><level>info</level><onMatch>ACCEPT</onMatch><onMismatch>DENY</onMismatch></filter></appender><!-- 2.3 level为 WARN 日志,时间滚动输出  --><appender name="WARN_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"><!-- 正在记录的日志文档的路径及文档名 --><file>${logging.path}/web_warn.log</file><!--日志文档输出格式--><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern><charset>UTF-8</charset> <!-- 此处设置字符集 --></encoder><!-- 日志记录器的滚动策略,按日期,按大小记录 --><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><fileNamePattern>${logging.path}/web-warn-%d{yyyy-MM-dd}.%i.log</fileNamePattern><timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"><maxFileSize>100MB</maxFileSize></timeBasedFileNamingAndTriggeringPolicy><!--日志文档保留天数--><maxHistory>15</maxHistory></rollingPolicy><!-- 此日志文档只记录warn级别的 --><filter class="ch.qos.logback.classic.filter.LevelFilter"><level>warn</level><onMatch>ACCEPT</onMatch><onMismatch>DENY</onMismatch></filter></appender><!-- 2.4 level为 ERROR 日志,时间滚动输出  --><appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"><!-- 正在记录的日志文档的路径及文档名 --><file>${logging.path}/web_error.log</file><!--日志文档输出格式--><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern><charset>UTF-8</charset> <!-- 此处设置字符集 --></encoder><!-- 日志记录器的滚动策略,按日期,按大小记录 --><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><fileNamePattern>${logging.path}/web-error-%d{yyyy-MM-dd}.%i.log</fileNamePattern><timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"><maxFileSize>100MB</maxFileSize></timeBasedFileNamingAndTriggeringPolicy><!--日志文档保留天数--><maxHistory>15</maxHistory></rollingPolicy><!-- 此日志文档只记录ERROR级别的 --><filter class="ch.qos.logback.classic.filter.LevelFilter"><level>ERROR</level><onMatch>ACCEPT</onMatch><onMismatch>DENY</onMismatch></filter></appender><!--<logger>用来设置某一个包或者具体的某一个类的日志打印级别、以及指定<appender>。<logger>仅有一个name属性,一个可选的level和一个可选的addtivity属性。name:用来指定受此logger约束的某一个包或者具体的某一个类。level:用来设置打印级别,大小写无关:TRACE, DEBUG, INFO, WARN, ERROR, ALL 和 OFF,还有一个特俗值INHERITED或者同义词NULL,代表强制执行上级的级别。如果未设置此属性,那么当前logger将会继承上级的级别。addtivity:是否向上级logger传递打印信息。默认是true。<logger name="org.springframework.web" level="info"/><logger name="org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor" level="INFO"/>--><!--使用mybatis的时候,sql语句是debug下才会打印,而这里我们只配置了info,所以想要查看sql语句的话,有以下两种操作:第一种把<root level="info">改成<root level="DEBUG">这样就会打印sql,不过这样日志那边会出现很多其他消息第二种就是单独给dao下目录配置debug模式,代码如下,这样配置sql语句会打印,其他还是正常info级别:【logging.level.org.mybatis=debug logging.level.dao=debug】--><!--root节点是必选节点,用来指定最基础的日志输出级别,只有一个level属性level:用来设置打印级别,大小写无关:TRACE, DEBUG, INFO, WARN, ERROR, ALL 和 OFF,不能设置为INHERITED或者同义词NULL。默认是DEBUG可以包含零个或多个元素,标识这个appender将会添加到这个logger。--><!--过滤掉spring和mybatis的一些无用的DEBUG信息--><logger name="org.springframework" level="INFO"></logger><logger name="org.mybatis" level="INFO"></logger><logger name="org.apache.zookeeper" level="INFO"></logger><!-- 4. 最终的策略 --><!-- 4.1 开发环境:打印控制台--><springProfile name="dev"><logger name="com.dowin.globalvillage.controller" level="debug"/><!-- 修改此处扫描包名 --></springProfile><root level="debug"><appender-ref ref="CONSOLE" /><appender-ref ref="DEBUG_FILE" /><appender-ref ref="INFO_FILE" /><appender-ref ref="WARN_FILE" /><appender-ref ref="ERROR_FILE" /></root><!--4.2 生产环境:输出到文档--><springProfile name="pro"><root level="info"><appender-ref ref="CONSOLE" /><appender-ref ref="DEBUG_FILE" /><appender-ref ref="INFO_FILE" /><appender-ref ref="ERROR_FILE" /><appender-ref ref="WARN_FILE" /></root></springProfile>
</configuration>
使用------------
package com.day.controller;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.Date;
//消费者
@Slf4j
@Component
public class Consumer implements Serializable {@RabbitListener(queues="QD")public void receiveD(Message message, Channel channel) throws Exception{byte[] body=message.getBody();String msg=new String(body, Charset.forName("UTF-8"));log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);}
}

//三.发布订阅

//1.延迟队列优化    
//(1)每个队列只对应一个延迟,如果面对变化的需求,怎么解决呢?    
//(2)写个传ttlTime的控制方法,但是发现发送20秒   发送2秒,都是一个时间接受到,为什么?    
//(3)RMQ只会检查第一个消息是否过期,就算是第二个消息延迟很短,第二个消息也不会优先执行,怎么弥补呢?    
//(4)延迟队列(基于插件的)下载插件rabbitmq_delayed_message_exchange-3.8.0.ez   百度网盘找    
//(5)放置到/usr/local/rabbitmq/plugins目录下    
//(6)安装 rabbitmq-plugins enable   rabbitmq_delayed_message_exchange    
//(7)rabbitmqctl stop(停止)    rabbitmq-server -detached(启动)    
//(8)重启看到x-delayed-message就代表成功。    
//(9)写代码    
------------配置
package com.day.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedQueueConfig {//交换机 队列 routingKey//队列public static final String DELAYED_QUEUE_NAME="delayed.queue";//交换机public static final String DELAYED_EXCHANGE_NAME="delayed.exchange";//routingKeypublic static final String DELAYED_ROUTING_KEY="delayed.routingKey";//声明交换机@Beanpublic CustomExchange delayedExchange(){Map<String,Object> arguments=new HashMap<>();arguments.put("x-delayed-type","direct");//交换机名称、类型、持久化、自动删除、其他参数return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);}//队列@Beanpublic Queue delayedQueue(){return new Queue(DELAYED_QUEUE_NAME);}//绑定@Beanpublic Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,@Qualifier("delayedExchange") CustomExchange delayedExchange){return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}
-------------生产者
package com.day.controller;
import com.day.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.io.Serializable;
import java.util.Date;
//发送延迟消息
//打印日志的注解
@Slf4j
@RestController
@RequestMapping("/ttl")
public class ProducerController implements Serializable{@Resourceprivate RabbitTemplate rabbitTemplate;//开始发消息@GetMapping("/sendMsg/{message}")public void sendMsg(@PathVariable String message){//System.out.println("当前时间: "+new Date().toString()+"发送一条信息给两个TTL队列 "+message);log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date(),message);rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列: "+message);rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列: "+message);}//开始发消息 消息 TTL@GetMapping("/sendExpireMsg/{message}/{ttlTime}")public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){log.info("当前时间:{},发送一条时长{}ms信息给TTL队列QC:{}",new Date(),ttlTime,message);rabbitTemplate.convertAndSend("X","XC",message,msg ->{msg.getMessageProperties().setExpiration(ttlTime);return msg;});}//开始发消息,基于插件的 消息及延迟的时间@GetMapping("/sendDelayMsg/{message}/{delayTime}")public void sendMsg(@PathVariable String message,@PathVariable int delayTime){log.info("当前时间:{},发送一条时长{}ms信息给延迟队列delayed.queue:{}",new Date(),delayTime,message);rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg ->{msg.getMessageProperties().setDelay(delayTime);return msg;});}
}
------------消费者
package com.day.controller;
import com.day.config.DelayedQueueConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.Date;
//消费者
@Slf4j
@Component
public class Consumer implements Serializable {@RabbitListener(queues= DelayedQueueConfig.DELAYED_QUEUE_NAME)public void receiveDelayQueue(Message message, Channel channel) throws Exception{byte[] body=message.getBody();String msg=new String(body, Charset.forName("UTF-8"));log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);}
}

image.png

//2.小结    
//(1)推荐使用RMQ解决延迟队列问题

四.发布确认高级

//1.概述    
//(1)RMQ重启期间,生产者消息投递失败,导致消息丢失,需要手动处理和恢复。    
//(2)交换机和队列有一个不在,都会导致消息的丢失。    
//(3)如果交换机收不到消息应该如何处理?    
//(4)生产者通过回调接口感知交换机是否接受消息成功。    
-------确认
package com.day.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConfirmConfig {//交换机public static final String CONFIRM_EXCHANGE_NAME="confirm_exchange";//队列public static final String CONFIRM_QUEUE_NAME="confirm_queue";//routingKeypublic static final String CONFIRM_ROUTING_KEY="key1";//声明交换机@Beanpublic DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE_NAME);}//队列@Beanpublic Queue confirmQueue(){return new Queue(CONFIRM_QUEUE_NAME);}//绑定@Beanpublic Binding queueBindingExchange(@Qualifier("confirmQueue")Queue confirmQueue,@Qualifier("confirmExchange")DirectExchange confirmExchange){return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);}
}
------------回调配置
package com.day.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {@Resourceprivate RabbitTemplate rabbitTemplate;//注入@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);}//交换机确认回调方法//1.发消息 交换机接收到了 会回调//correlationData 保存回调消息的ID及相关信息// b true 交换机收到了消息// s 失败的原因,成功时为null//2.发消息 交换机接收失败了,也会回调// b为false@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String ID;if(correlationData!=null){ID= correlationData.getId();}else{ID="";}if(ack){log.info("交换机已经收到消息,ID为{}的消息",ID);}else{log.info("交换机还未收到消息,ID为{}的消息,原因为{}",ID,cause);}}
}
------------yml文件配置
spring:#devtools:#restart:#enabled=true: #支持热部署  可能导致重启,然后非实时语音转写报错。rabbitmq:host: wdfgdzx.topport: 5672username: xlliu24password: s19911009!publisher-confirm-type: correlated
------------生产者
package com.day.controller;
import com.day.config.ConfirmConfig;
import com.day.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.io.Serializable;
import java.util.Date;
//发送延迟消息
//打印日志的注解
@Slf4j
@RestController
@RequestMapping("/ttl")
public class ProducerController implements Serializable{@Resourceprivate RabbitTemplate rabbitTemplate;//开始发消息@GetMapping("/sendMsg/{message}")public void sendMsg(@PathVariable String message){//System.out.println("当前时间: "+new Date().toString()+"发送一条信息给两个TTL队列 "+message);log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date(),message);rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列: "+message);rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列: "+message);}//开始发消息 消息 TTL@GetMapping("/sendExpireMsg/{message}/{ttlTime}")public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){log.info("当前时间:{},发送一条时长{}ms信息给TTL队列QC:{}",new Date(),ttlTime,message);rabbitTemplate.convertAndSend("X","XC",message,msg ->{msg.getMessageProperties().setExpiration(ttlTime);return msg;});}//开始发消息,基于插件的 消息及延迟的时间@GetMapping("/sendDelayMsg/{message}/{delayTime}")public void sendMsg(@PathVariable String message,@PathVariable int delayTime){log.info("当前时间:{},发送一条时长{}ms信息给延迟队列delayed.queue:{}",new Date(),delayTime,message);rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg ->{msg.getMessageProperties().setDelay(delayTime);return msg;});}//开始发送消息 测试确认@GetMapping("/sendMessage/{message}")public void sendMessage(@PathVariable String message){CorrelationData correlationData=new CorrelationData("110161");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY+"123",message,correlationData);log.info("发送消息内容为:{}",message);}
}
----------消费者
package com.day.controller;
import com.day.config.ConfirmConfig;
import com.day.config.DelayedQueueConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.Date;
//消费者
@Slf4j
@Component
public class Consumer implements Serializable {@RabbitListener(queues= ConfirmConfig.CONFIRM_QUEUE_NAME)public void receiveConfirmMessage(Message message, Channel channel) throws Exception{byte[] body=message.getBody();String msg=new String(body, Charset.forName("UTF-8"));log.info("当前时间:{},接收到的队列confirm.queue消息:{}",new Date().toString(),msg);}
}

image.png

//2.回退消息    
//(1)如果发现交换机和信道之间不可路由,要通过设置Mandatory参数可以在不可送达时送回给生产者。 
------------生产者
package com.day.controller;
import com.day.config.ConfirmConfig;
import com.day.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.io.Serializable;
import java.util.Date;
//发送延迟消息
//打印日志的注解
@Slf4j
@RestController
@RequestMapping("/ttl")
public class ProducerController implements Serializable{@Resourceprivate RabbitTemplate rabbitTemplate;//开始发消息@GetMapping("/sendMsg/{message}")public void sendMsg(@PathVariable String message){//System.out.println("当前时间: "+new Date().toString()+"发送一条信息给两个TTL队列 "+message);log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date(),message);rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列: "+message);rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列: "+message);}//开始发消息 消息 TTL@GetMapping("/sendExpireMsg/{message}/{ttlTime}")public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){log.info("当前时间:{},发送一条时长{}ms信息给TTL队列QC:{}",new Date(),ttlTime,message);rabbitTemplate.convertAndSend("X","XC",message,msg ->{msg.getMessageProperties().setExpiration(ttlTime);return msg;});}//开始发消息,基于插件的 消息及延迟的时间@GetMapping("/sendDelayMsg/{message}/{delayTime}")public void sendMsg(@PathVariable String message,@PathVariable int delayTime){log.info("当前时间:{},发送一条时长{}ms信息给延迟队列delayed.queue:{}",new Date(),delayTime,message);rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg ->{msg.getMessageProperties().setDelay(delayTime);return msg;});}//开始发送消息 测试确认@GetMapping("/sendMessage/{message}")public void sendMessage(@PathVariable String message){CorrelationData correlationData=new CorrelationData("110161");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY+"123",message,correlationData);log.info("发送消息内容为:{}",message);}
}   
------------回调配置
package com.day.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {@Resourceprivate RabbitTemplate rabbitTemplate;//注入@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}//交换机确认回调方法//1.发消息 交换机接收到了 会回调//correlationData 保存回调消息的ID及相关信息// b true 交换机收到了消息// s 失败的原因,成功时为null//2.发消息 交换机接收失败了,也会回调// b为false@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String ID;if(correlationData!=null){ID= correlationData.getId();}else{ID="";}if(ack){log.info("交换机已经收到消息,ID为{}的消息",ID);}else{log.info("交换机还未收到消息,ID为{}的消息,原因为{}",ID,cause);}}//在消息不可送达时,将消息返回给生产者,只有失败的时候才会回退@Overridepublic void returnedMessage(Message message, int replyCode,String replyText, String exchange,String routingKey) {log.error("消息{},被交换机{}退回,退回原因:{},路由key:{}",new String(message.getBody()),exchange,replyText,routingKey);}
}
------------yml配置
spring:#devtools:#restart:#enabled=true: #支持热部署  可能导致重启,然后非实时语音转写报错。rabbitmq:host: wdfgdzx.topport: 5672username: xlliu24password: s19911009!publisher-confirm-type: correlatedpublisher-returns: true

image.png

//3.备份交换机    
//(1)先写交换机、路由、队列的绑定关系    
//(2)再写消费者,然后删除原有的确认交换机(因为他会转发,和之前的不同了)    
------交换机之间关系配置
package com.day.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConfirmConfig {//交换机public static final String CONFIRM_EXCHANGE_NAME="confirm_exchange";//队列public static final String CONFIRM_QUEUE_NAME="confirm_queue";//routingKeypublic static final String CONFIRM_ROUTING_KEY="key1";//备份交换机public static final String BACKUP_EXCHANGE_NAME="backup_exchange";//备份队列public static final String BACKUP_QUEUE_NAME="backup_queue";//报警队列public static final String WARNING_QUEUE_NAME="warning_queue";//声明交换机@Beanpublic DirectExchange confirmExchange(){return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();}//队列@Beanpublic Queue confirmQueue(){return new Queue(CONFIRM_QUEUE_NAME);}//绑定@Beanpublic Binding queueBindingExchange(@Qualifier("confirmQueue")Queue confirmQueue,@Qualifier("confirmExchange")DirectExchange confirmExchange){return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);}//备份交换机@Beanpublic FanoutExchange backupExchange(){return new FanoutExchange(BACKUP_EXCHANGE_NAME);}//队列@Beanpublic Queue backupQueue(){return new Queue(BACKUP_QUEUE_NAME);}@Beanpublic Queue warningQueue(){return new Queue(WARNING_QUEUE_NAME);}@Beanpublic Binding backupQueueBindingBackupExchange(@Qualifier("backupQueue")Queue backupQueue,@Qualifier("backupExchange")FanoutExchange backupExchange){return BindingBuilder.bind(backupQueue).to(backupExchange);}@Beanpublic Binding warningQueueBindingBackupExchange(@Qualifier("warningQueue")Queue warningQueue,@Qualifier("backupExchange")FanoutExchange backupExchange){return BindingBuilder.bind(warningQueue).to(backupExchange);}
}
-------报警消费者
package com.day.controller;
import com.day.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;//报警消费者
@Slf4j
@Component
public class WarningConsumer {@RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)public void receiveWarningMsg(Message message){String msg=new String(message.getBody());log.error("报警,发现不可路由消息:{}",msg);log.info("当前时间:{},接收到的队列warning_queue消息:{}",new Date().toString(),msg);}
}

image.png

//4.RMQ其他知识点    
//(1)消息被重复消费,重复扣了用户的钱。幂等性问题    
//(2)幂等性问题的解决一般使用全局ID,使用该ID判断该消息是否已消费过。    
//(3)唯一ID+指纹码,或利用redis的原子性去实现    
//(4)利用redis执行setnx命令,天然具有幂等性,从而实现不重复消费。
//5.优先级队列    
//(1)订单催付场景 RMQ进行改造和优化,对大客户的订单进行优先级的提升。    
//(2)生产者先把消息发到队列之中,然后消费者再消费。    
------交换机、队列、路由配置
package com.day.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;@Configuration
public class CommonConfig {//交换机public static final String EXCHANGE_NAME="exchange";//队列public static final String QUEUE_NAME="queue";//routingKeypublic static final String ROUTING_KEY="key";//声明交换机@Beanpublic DirectExchange exchange(){return new DirectExchange(EXCHANGE_NAME);}//队列@Beanpublic Queue queue(){Map<String,Object> arguments=new HashMap<>();//队列arguments.put("x-max-priority",10);//官方允许是0-255 此处设置10 允许0-10 不用设置过大 浪费CUP和内存return QueueBuilder.durable(QUEUE_NAME).withArguments(arguments).build();}//绑定@Beanpublic Binding queueBindingExchange(@Qualifier("queue")Queue queue,@Qualifier("exchange")DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);}
}
------生产者
package com.day.controller;
import com.day.config.CommonConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.io.Serializable;
import java.util.Date;
//发送延迟消息
//打印日志的注解
@Slf4j
@RestController
@RequestMapping("/RMQ")
public class ProducerController implements Serializable{@Resourceprivate RabbitTemplate rabbitTemplate;//开始发消息@GetMapping("/sendMessage/{message}")public void sendMsg(@PathVariable String message){for(int i=1;i<11;i++){if(i==5){rabbitTemplate.convertAndSend(CommonConfig.EXCHANGE_NAME,CommonConfig.ROUTING_KEY,"生产者生产消息:"+message+i,msg -> {msg.getMessageProperties().setPriority(5);return msg;});log.info("当前时间:{},发送一条信息给队列:{}",new Date(),message+i);}else{rabbitTemplate.convertAndSend(CommonConfig.EXCHANGE_NAME,CommonConfig.ROUTING_KEY,"生产者生产消息:"+message+i);log.info("当前时间:{},发送一条信息给队列:{}",new Date(),message+i);}}}
}
------消费者
package com.day.controller;
import com.day.config.CommonConfig;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.Date;
//消费者
/*
@Slf4j
@Component
public class Consumer implements Serializable {@RabbitListener(queues= CommonConfig.QUEUE_NAME)public void receiveConfirmMessage(Message message, Channel channel) throws Exception{byte[] body=message.getBody();String msg=new String(body, Charset.forName("UTF-8"));log.info("当前时间:{},接收到的队列confirm.queue消息:{}",new Date().toString(),msg);}
}
*/
//接收消息
public class Consumer {//接收消息public static void main(String[] args) throws Exception {//创建连接工厂ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setHost("47.105.174.97");connectionFactory.setUsername("xlliu24");connectionFactory.setPassword("s19911009!");//创建新链接Connection connection=connectionFactory.newConnection();Channel channel=connection.createChannel();//声明 接受消息DeliverCallback deliverCallback=(consumerTag, message) ->{System.out.println(new String(message.getBody()));};//取消消费CancelCallback cancelCallback= consumerTage ->{System.out.println("消费消息被中断...");};//消费者消费消息/*** 1.消费哪个队列* 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调** */channel.basicConsume(CommonConfig.QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

image.png

//6.惰性队列    
//(1)消息保存在内存中还是在磁盘上,正常消息是保存在内存中。在惰性中,消息是保存在磁盘中的。    
//(2)当有消费者宕机、大量消息积压时,才用惰性队列。

//五.集群

//1.集群原理    
//(1)集群可以不断的扩充,需要有3台机器    
//(2)修改三台机器的RMQ名称,分别为node1 node2 node3    
//(3)vim /etc/hosts  IP 节点名称对应。三个机器都需要这么配置    
//(4)远程复制命令 scp   /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie    
//(5)重启RMQ与erlang   rabbitmq-server -detached    
//(6)节点二操作rabbitmqctl   stop_app ; rabbitmqctl reset ;rabbitmqctl join_cluster rabbit@node1;rabbitmqctl start_app   ;节点三操作相同    
//(7)rabbitmqctl cluster_status 查看集群状态    
//(8)rabbitmqctl add_user admin 123   ;rabbitmqctl set_user_tags admin administrator    
//(9)rabbitmqctl set_permissions -p   "/" admin ".*" ".*" ".*"    
//(10)登录后在Overview中可以看到节点    
//(11)也可以解除集群节点

image.png

//2.镜像队列    
//(1)如果有个节点宕机了,重启发现消息丢失了,镜像队列就是备份。    
//(2)发给1节点,但备份到2号节点。通过85集的设置可以实现。就算整个集群只剩下一台机器,也可以处理。
//3.负载均衡    
//(1)Haproxy实现负载均衡(比如Nginx),实现高并发高可用
//4.联邦交换机    
//(1)异地机房网络延迟的问题,北京ExchangeA  深圳ExchangeB,如果北京的用户访问深圳的RMQ怎么办?    
//(2)在每台机器上开启federation相关插件    
//(3)rabbitmq-plugins   enable rabbitmq_faderation ;rabbitmaq-plugins enablerabbitmq_federation_management   ; 自带的插件,能看到 Federation Status ;Federation Upstarems    
//(4)联邦队列 也可以同步数据,交换机也可以实现。
//5.Shovel    
//(1)铲子,可以将数据从一端挖到另一端    
//(2)rabbitmq-plugins   enable rabbitmq_shovelrabbitmq-plugins enable   rabbitmq_shovel_management    
//(3)node1 q1 同步到node2 q2中  实现跨地区数据同步

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/830950.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

服务器数据恢复—服务器重装系统导致XFS分区丢失的数据恢复案例

服务器数据恢复环境&#xff1a; 一台服务器MD1200磁盘柜&#xff0c;通过raid卡将15块磁盘组建成一组raid5磁盘阵列。raid5阵列分配了2个lun&#xff0c;操作系统层面对lun进行分区&#xff1a;1个分区采用LVM扩容方式加入到了root_lv中&#xff0c;其余分区格式化为XFS文件系…

强化学习(Reinforcement learning)基本概念

概念&#xff1a; 强化学习是在与环境互动中为达到一个目标而进行的学习过程 三层结构&#xff1a; 基本元素&#xff1a;agent、environment、goal agent&#xff1a;可以理解为玩家&#xff0c;即某个游戏的参与方 environment&#xff1a;环境本身&#xff0c;可以理…

数据结构复习指导之串的模式匹配

文章目录 串的模式匹配 考纲内容 复习提示 1.简单的模式匹配算法 知识回顾 2.串的模式匹配算法——KMP算法 2.1字符串的前缀、后缀和部分匹配值 2.2KMP算法的原理是什么 3.KMP算法的进一步优化 串的模式匹配 考纲内容 字符串模式匹配 复习提示 本章是统考大纲第6章内…

Android开发知识杂录

1.XML解析问题 增加XML布局文件时候出现 mergeDebugResources 错误 解决方案 由于XML默认文件带有BOM&#xff0c;remove bom即可 2.开机启动界面添加 3.开机隐藏系统桌面 4.添加敲击传感器GPIO 1. 测试板子的GPIO引脚情况 echo in > /sys/class/gpio/gpio<gpio_number…

排序-八大排序FollowUp

FollowUp 1.插入排序 (1).直接插入排序 时间复杂度:最坏情况下:0(n^2) 最好情况下:0(n)当数据越有序 排序越快 适用于: 待排序序列 已经基本上趋于有序了! 空间复杂度:0(1) 稳定性:稳定的 public static void insertSort(int[] array){for (int i 1; i < array.length; i…

计算机网络chapter1——家庭作业

文章目录 复习题1.1节&#xff08;1&#xff09; “主机”和“端系统”之间有何不同&#xff1f;列举几种不同类型的端系统。web服务器是一种端系统吗&#xff1f;&#xff08;2&#xff09;协议一词常用来用来描述外交关系&#xff0c;维基百科是如何描述外交关系的&#xff1…

mac虚拟机软件哪个好 mac虚拟机怎么安装Windows 苹果Mac电脑上受欢迎的主流虚拟机PK Parallels Desktop和VM

什么是苹果虚拟机&#xff1f; 苹果虚拟机是一种软件工具&#xff0c;它允许在非苹果硬件上运行苹果操作系统&#xff08;如ios&#xff09;。通过使用虚拟机&#xff0c;您可以在Windows PC或Linux上体验和使用苹果的操作系统&#xff0c;而无需购买苹果硬件。 如何使用苹果虚…

CSDN如何在个人主页开启自定义模块|微信公众号

目前只有下面三种身份才具有这个功能。 VIP博客专家企业博客 栏目内容不知道怎么写HTML的&#xff0c;可以联系我帮你添加

Maven入门:1.简介与环境搭建

一.简介与环境搭建 1.Maven&#xff1a;用于自动化构建项目&#xff08;按照企业主流模板构建完善的项目结构&#xff09;和管理项目依赖&#xff08;依赖就是项目的jar包&#xff0c;通过配置的方式进行添加和管理&#xff0c;自动下载和导入&#xff09;的工具。即更加方便构…

C 408—《数据结构》图、查找、排序专题考点(含解析)

目录 Δ前言 六、图 6.1 图的基本概念 6.2 图的存储及基本操作 6.3 图的遍历 6.4 图的应用 七、查找 7.2 顺序查找和折半查找 7.3 树型查找 7.4 B树和B树 7.5 散列表 八、排序 8.2 插入排序 8.3 交换排序 8.4 选择排序 8.5 归并排序和基数排序 8.6 各种内部排序算法的比较及…

表格中斜线的处理

此处的斜线,不是用表格写的,但是也适用于表格,只是需要更改表格的样式,可以 按照如下处理,即可 呈现的效果:如图所示 template部分: <div class"header_detail custom"><div class"right">节次</div><div class"left">…

C/C++实现高性能并行计算——1.pthreads并行编程(中)

系列文章目录 pthreads并行编程(上)pthreads并行编程(中)pthreads并行编程(下)使用OpenMP进行共享内存编程 文章目录 系列文章目录前言一、临界区1.1 pi值估计的例子1.2 找到问题竞争条件临界区 二、忙等待三、互斥量3.1 定义和初始化互斥锁3.2 销毁。3.3 获得临界区的访问权&…

windows11安装nginx

1.解压nginx安装包到没有中文的目录 2.双击运行nginx.exe 3.任务管理器查看是否有nginx进程 4.任务管理器->性能->资源监视器 5.网络->侦听端口&#xff0c;查看nginx侦听的端口&#xff0c;这里是90端口

大连宇都环境 | 成都5月水科技大会暨技术装备成果展览会

中华环保联合会水环境治理专业委员会 秘书处 王小雅 13718793867 —— 展位号&#xff1a;A09 —— 一、企业介绍 大连宇都环境成立于2002年&#xff0c;公司20年 MBBR填料产品及工艺技术&#xff0c;&#xff0c;构建了研发、制造、设计、工程、运营链式服务能力&#xff…

数据赋能(73)——数据要素:特征

生产要素中的数据要素具有一系列基本特征&#xff0c;这些特征使得数据在现代经济活动中发挥着越来越重要的作用。数据要素的主要特征如下图所示。 数据已经成为关键的生产要素&#xff0c;数据要素的基本特征可以概括为&#xff1a;虚拟性、非消耗性、非稀缺性、非均质性、排他…

selinux 基础知识

目录 概念 作用 SELinux与传统的权限区别 SELinux工作原理 名词解释 主体&#xff08;Subject&#xff09; 目标&#xff08;Object&#xff09; 策略&#xff08;Policy&#xff09; 安全上下文&#xff08;Security Context&#xff09; 文件安全上下文查看 先启用…

如何解决网络应用运行中的审核问题【系列研究预告】

目前互联网是非常发达的&#xff0c;但是随着技术的发展&#xff0c;有些问题逐渐变得严重。对于一般企业而言&#xff0c;一个比较重要的问题就是审核准确性和成本问题。 比如知乎的审判官&#xff0c;我本人是最早的一批审判官&#xff0c;然而多年下来的经历却很让人感到无…

数据结构—C语言实现双向链表

目录 1.双向带头循环链表 2.自定义头文件&#xff1a; 3.List.cpp 文件 3.1 newnode()函数讲解 3.2 init() 函数 初始化 3.3 pushback()函数 尾插 3.4 pushfront()函数 头插 3.5 popback() 尾删 3.6 popfront() 函数 头删 3.7 insert()函数 在pos之后插入 3.8 popbac…

uniapp 对接 Apple 登录

由于苹果要求App使用第三方登录必须要求接入Apple登录 不然审核不过 所以&#xff1a; 一、勾选苹果登录 二、 设置AppId Sign In Apple 设置完成重新生成描述文件 &#xff01;&#xff01;&#xff01;&#xff01;证书没关系 示例代码&#xff1a; async appleLogin…

Delta lake with Java--将数据保存到Minio

今天看了之前发的文章&#xff0c;居然有1条评论&#xff0c;看到我写的东西还是有点用。 今天要解决的问题是如何将 Delta产生的数据保存到Minio里面。 1、安装Minio&#xff0c;去官网下载最新版本的Minio&#xff0c;进入下载目录&#xff0c;运行如下命令&#xff0c;曾经…