2.7日学习打卡----初学RabbitMQ(二)

2.7日学习打卡

在这里插入图片描述
JMS
由于MQ产品很多,操作方式各有不同,于是JAVA提供了一套规则
——JMS,用于操作消息中间件。JMS即Java消息服务
(JavaMessage Service)应用程序接口,是一个Java平台中关于面
向消息中间件的API。JMS是JavaEE规范中的一种,类比JDBC。很多
MQ产品都实现了JMS规范,例如ActiveMQ。RabbitMQ官方并没
有实现JMS规范,但是开源社区有JMS的实现包。

创建项目

# 开启管控台插件
rabbitmq-plugins enable
rabbitmq_management
# 启动rabbitmq
rabbitmq-server -detached

创建普通maven项目,添加RabbitMQ依赖:

<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqpclient</artifactId><version>5.14.0</version></dependency>
</dependencies>

一. RabbitMQ 简单模式在这里插入图片描述

P:生产者,也就是要发送消息的程序

C:消费者:消息的接收者,会一直等待消息到来

queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息

特点:

  1. 一个生产者对应一个消费者,通过队列进行消息传递。
  2. 该模式使用direct交换机,direct交换机是RabbitMQ默认交换机

生产者代码实现

步骤:

  1. 创建连接工厂ConnectionFactory
  2. 设置工厂的参数
  3. 创建连接 Connection
  4. 创建管道 Channel
  5. 简单模式中没有交换机exchange,所以不用创建(RabbitMQ会使用默认的交换机!)
  6. 创建队列 queue
  7. 设置发送内容,使用channal.basicPublish()发送
  8. 释放资源

代码实现

package com.jjy.mq.simple;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;//生产者
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();//使用自己的服务器ip地址connectionFactory.setHost("192.168.66.100");//rabbitmq的默认端口5672connectionFactory.setPort(5672);//用户名connectionFactory.setUsername("jjy");//密码connectionFactory.setPassword("jjy");//虚拟机connectionFactory.setVirtualHost("/");//2.创建连接Connection connection = connectionFactory.newConnection();//3.建立信道Channel channel = connection.createChannel();//4.创建队列,如果队列已存在,则使用该队列/**//     * 参数1:队列名//     * 参数2:是否持久化,true表示MQ重启后队列还在。//     * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问//     * 参数4:是否自动删除,true表示不再使用队列时自动删除队列//     * 参数5:其他额外参数//     */channel.queueDeclare("simple_queue",false,false,false,null);//5.发送消息String mesg="hello rabbitmq";/*** 参数1:交换机名,""表示默认交换机* 参数2:路由键,简单模式就是队列名* 参数3:其他额外参数* 参数4:要传递的消息字节数组*/channel.basicPublish("","simple_queue",null,mesg.getBytes());//6.关闭资源(信道和连接)channel.close();connection.close();System.out.println("发送成功");}
}

消费者代码实现

在这里插入图片描述

步骤:

1.创建连接工厂ConnectionFactory
2.设置工厂参数
3.创建连接
4.创建信道
前四步代码基本是一致的,需要注意的是生产者与消费者的Channel是不同Connection中的!不是同一个对象.
5. 最简单的模型没有交换机exchange,所以此处RabbitMQ会使用默认的交换机
6. 接收消息,有一个回调方法 channel.basicConsume()

代码实现

package com.jjy.mq.simple;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Customer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");//2.创建连接Connection connection = connectionFactory.newConnection();//3.建立信道Channel channel = connection.createChannel();//4.监听队列/*** 参数1:监听的队列名* 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息* 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费*/channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body,"UTF-8");System.out.println("接受消息,消息为:"+message);}});//}
}

二. RabbitMQ 工作队列模式

在这里插入图片描述
与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该
模式也使用direct交换机,应用于处理消息较多的情况。特点如
下:

  1. 一个队列对应多个消费者。
  2. 一条消息只会被一个消费者消费。
  3. 消息队列默认采用轮询的方式将消息平均发送给消费者

应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度

生产者代码实现

代码实现

package com.jjy.mq.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 2.创建连接Connection connection = connectionFactory.newConnection();// 3.建立信道Channel channel = connection.createChannel();// 4.创建队列,持久化队列channel.queueDeclare("work_queue",true,false,false,null);// 5.发送大量消息,参数3表示该消息为持久化消息,即除了保存到内存还会保存到磁盘中for(int i=0;i<100;i++){channel.basicPublish("","work_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, ("你好,这是今天的第"+i+"条消息").getBytes());}// 6.关闭资源channel.close();connection.close();}
}

消费者代码实现

在这里插入图片描述

消费者1:

package com.jjy.mq.work;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Customer1 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");//2.创建连接Connection connection = connectionFactory.newConnection();//3.建立信道Channel channel = connection.createChannel();// 4.监听队列,处理消息channel.basicConsume("work_queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费者1消费消息,消息为:" + message);}});}
}

消费者2

package com.jjy.mq.work;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Customer2 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");//2.创建连接Connection connection = connectionFactory.newConnection();//3.建立信道Channel channel = connection.createChannel();// 4.监听队列,处理消息channel.basicConsume("work_queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费者2消费消息,消息为:" + message);}});}}

消费者3

package com.jjy.mq.work;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Customer3 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");//2.创建连接Connection connection = connectionFactory.newConnection();//3.建立信道Channel channel = connection.createChannel();// 4.监听队列,处理消息channel.basicConsume("work_queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费者3消费消息,消息为:" + message);}});}}

三. RabbitMQ 发布订阅模式

在这里插入图片描述
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机

C:消费者,消息的接收者,会一直等待消息到来

Queue:消息队列,接收消息、缓存消息

在开发过程中,有一些消息需要不同消费者进行不同的处理,如电
商网站的同一条促销信息需要短信发送、邮件发送、站内信发送
等。此时可以使用发布订阅模式(Publish/Subscribe)
特点:

  1. 生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中。
  2. 工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。发布订阅模式使用fanout交换机。

Exchange:交换机(X)一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、 递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic(常用):通配符,把消息交给符合routing pattern(路由模式)的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失
在这里插入图片描述

生产者代码实现

与之前的步骤相比,多了创建交换机和绑定交换机与队列的操作

代码实现

package com.jjy.mq.publish;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class produce {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");//2.创建连接Connection connection = connectionFactory.newConnection();//3.建立信道Channel channel = connection.createChannel();//4.创建交换机/*exchangeDeclare(String exchange,                  -- 交换机的名称String type,                      -- 交换机的类型,4种枚举(direct,fanout,topic,headers)boolean durable,                  -- 持久化boolean autoDelete,               -- 自动删除boolean internal,                 -- 内部使用,基本是falseMap<String, Object> arguments)    -- 参数*/*** 参数1:交换机名* 参数2:交换机类型* 参数3:交换机持久化*/channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT,true);//5.创建队列//短信队列channel.queueDeclare("SEND_MAIL",true,false,false,null);//消息队列channel.queueDeclare("SEND_MESSAGE",true,false,false,null);//站内信息channel.queueDeclare("SEND_STATION",true,false,false,null);//6.交换机绑定队列/*** 参数1:队列名* 参数2:交换机名* 参数3:路由关键字,发布订阅模式写""即可*/channel.queueBind("SEND_MAIL","exchange_fanout","");channel.queueBind("SEND_MESSAGE","exchange_fanout","");channel.queueBind("SEND_STATION","exchange_fanout","");//7.发送消息for (int i = 1; i <= 10 ; i++) {channel.basicPublish("exchange_fanout","",null,("你好,尊敬的用户,秒杀商品开抢了!"+i).getBytes(StandardCharsets.UTF_8));}//8.关闭资源channel.close();connection.close();}
}

消费者代码实现

接下来编写三个消费者,分别监听各自的队列。
//站内信消费者

package com.jjy.mq.publish;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 站内信消费者
public class CustomerStation {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_STATION", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送站内信:"+message);}});}
}

邮件消费者

 
package com.jjy.mq.publish;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class CustomerMail {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_MAIL", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送邮件:"+message);}});}
}

短信消费者

package com.jjy.mq.publish;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class CustomerMessage {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_MESSAGE", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送短信:"+message);}});}
}

也可以使用工作队列+发布订阅模式同时使用,两个消费者同时监听
一个队列:


// 短信消费者2
public class CustomerMessage2 {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.0.162");connectionFactory.setPort(5672);connectionFactory.setUsername("itbaizhan");connectionFactory.setPassword("itbaizhan");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_MESSAGE", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送短信2:"+message);}});}
}

两个不一样的系统,对同一条消息做不一样的处理

发布订阅模式与工作队列模式的区别
(1)工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机

(2)发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用 默认交换机)

(3)发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机

四. RabbitMQ 路由模式

在这里插入图片描述
使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多
时候,不是所有消息都无差别的发布到所有队列中。比如电商网站
的促销活动,双十一大促可能会发布到所有队列;而一些小的促销
活动为了节约成本,只发布到站内信队列。此时需要使用路由模式
(Routing)完成这一需求。
特点:

  1. 每个队列绑定路由关键字RoutingKey
  2. 生产者将带有RoutingKey的消息发送给交换机,交换机根据RoutingKey转发到指定队列。路由模
    式使用direct交换机。

队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)

消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey

Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息
在这里插入图片描述

生产者代码实现

package com.jjy.mq.routing;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class produce {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");//2.创建连接Connection connection = connectionFactory.newConnection();//3.建立信道Channel channel = connection.createChannel();//4.创建交换机/*** 参数1:交换机名* 参数2:交换机类型* 参数3:交换机持久化*/channel.exchangeDeclare("exchange_routing", BuiltinExchangeType.DIRECT,true);// 5.创建队列channel.queueDeclare("SEND_MAIL2",true,false,false,null);channel.queueDeclare("SEND_MESSAGE2",true,false,false,null);channel.queueDeclare("SEND_STATION2",true,false,false,null);//6.交换机绑定队列/*** 参数1:队列名* 参数2:交换机名* 参数3:路由关键字,发布订阅模式写""即可*/channel.queueBind("SEND_MAIL2","exchange_routing","import");channel.queueBind("SEND_MESSAGE2","exchange_routing","import");channel.queueBind("SEND_STATION2","exchange_routing","import");channel.queueBind("SEND_STATION2","exchange_routing","normal");//7.发送消息channel.basicPublish("exchange_routing","import",null,"双十一大促活动".getBytes());channel.basicPublish("exchange_routing","normal",null,"小型促销活动".getBytes());//8.关闭资源channel.close();connection.close();}
}

消费者代码实现

站内信消费者

package com.jjy.mq.routing;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 站内信消费者
public class CustomerStation {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_STATION2", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送站内信:"+message);}});}
}

短信消费者

package com.jjy.mq.routing;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class CustomerMessage {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_MESSAGE2", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送短信:"+message);}});}
}

邮件消费者

package com.jjy.mq.routing;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class CustomerMail {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_MAIL2", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送邮件:"+message);}});}
}

总的来说就一句话:

Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。

五. RabbitMQ 通配符模式

在这里插入图片描述
通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的
路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消
息转发到该队列。通配符模式比路由模式更灵活,使用topic交换
机.
通配符规则

  1. 消息设置RoutingKey时,RoutingKey由多个单词构成,中间以 . 分割。
  2. 队列设置RoutingKey时, # 可以匹配任意多个单词, * 可以匹配任意一个单词。

生产者代码实现

在这里插入图片描述
代码实现

package com.jjy.mq.topic;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class produce {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");//2.创建连接Connection connection = connectionFactory.newConnection();//3.建立信道Channel channel = connection.createChannel();//4.创建交换机/*** 参数1:交换机名* 参数2:交换机类型* 参数3:交换机持久化*/channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC,true);// 5.创建队列channel.queueDeclare("SEND_MAIL3",true,false,false,null);channel.queueDeclare("SEND_MESSAGE3",true,false,false,null);channel.queueDeclare("SEND_STATION3",true,false,false,null);//6.交换机绑定队列channel.queueBind("SEND_MAIL3","exchange_topic","#.mail.#");channel.queueBind("SEND_MESSAGE3","exchange_topic","#.message.#");channel.queueBind("SEND_STATION3","exchange_topic","#.station.#");//7.发送消息channel.basicPublish("exchange_topic","mail.message.station",null,"双十一大促活动".getBytes());channel.basicPublish("exchange_topic","station",null,"小型促销活动".getBytes());//8.关闭资源channel.close();connection.close();}
}

消费者代码实现

站内信消费者

package com.jjy.mq.topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 站内信消费者
public class CustomerStation {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_STATION3", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送站内信:"+message);}});}
}

短信消费者

package com.jjy.mq.topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class CustomerMessage {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_MESSAGE3", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送短信:"+message);}});}
}

邮件消费者

package com.jjy.mq.topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class CustomerMail {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.66.100");connectionFactory.setPort(5672);connectionFactory.setUsername("jjy");connectionFactory.setPassword("jjy");connectionFactory.setVirtualHost("/");// 默认虚拟机//2.创建连接Connection conn = connectionFactory.newConnection();//3.建立信道Channel channel = conn.createChannel();// 4.监听队列channel.basicConsume("SEND_MAIL3", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");System.out.println("发送邮件:"+message);}});}
}

总述:topics模式比routing模式要更加灵活,笼统的说就是routing模式加上通配符

如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力!
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/676970.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

springboot174基于springboot的疾病防控综合系统的设计与实现

简介 【毕设源码推荐 javaweb 项目】基于springbootvue 的 适用于计算机类毕业设计&#xff0c;课程设计参考与学习用途。仅供学习参考&#xff0c; 不得用于商业或者非法用途&#xff0c;否则&#xff0c;一切后果请用户自负。 看运行截图看 第五章 第四章 获取资料方式 **项…

vscode wsl远程连接 权限问题

问题描述&#xff1a;执行命令时遇到Operation not permitted 和 Permission denied问题&#xff0c;是有关ip地址和创建文件的权限问题&#xff0c;参考网络上更改wsl.conf文件等方法均无法解决&#xff0c;只能加sudo来解决

【MySQL进阶之路】磁盘随机读写和顺序读写对MySQL性能的影响

欢迎关注公众号&#xff08;通过文章导读关注&#xff1a;【11来了】&#xff09;&#xff0c;及时收到 AI 前沿项目工具及新技术的推送&#xff01; 在我后台回复 「资料」 可领取编程高频电子书&#xff01; 在我后台回复「面试」可领取硬核面试笔记&#xff01; 文章导读地址…

前后端通讯:前端调用后端接口的五种方式,优劣势和场景

Hi&#xff0c;我是贝格前端工场&#xff0c;专注前端开发8年了&#xff0c;前端始终绕不开的一个话题就是如何和后端交换数据&#xff08;通讯&#xff09;&#xff0c;本文先从最基础的通讯方式讲起。 一、什么是前后端通讯 前后端通讯&#xff08;Frontend-Backend Commun…

解析十六进制雷达数据格式:解析雷达数据长度。

以Cat62格式雷达数据为例&#xff0c;十六进制雷达数据部分代码&#xff1a; 3e0120bf7da4ffee0085 雷达数据长度使用4个字符&#xff08;2个字节&#xff09;标识&#xff0c;在这里是“0120”&#xff0c;转换为十进制数为288。 雷达数据长度父类&#xff1a; base_length_…

python 基础知识点(蓝桥杯python科目个人复习计划35)

今日复习计划&#xff1a;阶段总结&#xff08;新年贺礼&#xff09; 1.python简介&#xff08;定义&#xff0c;优点&#xff0c;缺点&#xff0c;应用领域&#xff09; python&#xff1a;一种广泛使用的解释型&#xff0c;高级和通用的编程语言 python极简&#xff0c;生…

Xray 工具笔记

Xray 官方文档 扫描单个url&#xff08;非爬虫&#xff09; 并输出文件&#xff08;不同文件类型&#xff09; .\xray.exe webscan --url 10.0.0.6:8080 --text-output result.txt --json-output result.json --html-output report.html默认启动所以内置插件 &#xff0c;指定…

前端JavaScript篇之实现call、apply 及 bind 函数

目录 实现call、apply 及 bind 函数1. 实现call函数2. 实现apply函数3. 实现bind函数 实现call、apply 及 bind 函数 call、apply和bind函数都是用于改变函数中this指向的方法。它们的作用都是使函数能够在不同的对象上下文中运行。call方法和apply方法的作用类似&#xff0c;…

多元回归分析:理论与应用

多元回归分析是一种统计方法&#xff0c;用于研究两个或多个自变量&#xff08;解释变量&#xff09;与一个因变量&#xff08;响应变量&#xff09;之间的关系。这种分析允许研究者评估多个因素对结果变量的影响&#xff0c;是社会科学、经济学、生物医学和工程等多个领域中常…

【doghead】uv_loop_t的创建及线程执行

worker测试程序,类似mediasoup对uv的使用,是one loop per thread 。创建一个UVLoop 就可以创建一个uv_loop_t Transport 创建一个: 试验配置创建一个: UvLoop 封装了libuv的uv_loop_t ,作为共享指针提供 对uv_loop_t 创建并初始化

鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之Toggle组件

鸿蒙&#xff08;HarmonyOS&#xff09;项目方舟框架&#xff08;ArkUI&#xff09;之Toggle组件 一、操作环境 操作系统: Windows 10 专业版、IDE:DevEco Studio 3.1、SDK:HarmonyOS 3.1 二、Toggle组件 组件提供勾选框样式、状态按钮样式及开关样式。 子组件 仅当Toggl…

【MySQL】数据库基础 -- 详解

一、什么是数据库 存储数据用文件就可以了&#xff0c;为什么还要弄个数据库? 一般的文件确实提供了数据的存储功能&#xff0c;但是文件并没有提供非常好的数据&#xff08;内容&#xff09;的管理能力&#xff08;用户角度&#xff09;。 文件保存数据有以下几个缺点&…

无心剑中译佚名《春回大地》

The Coming of Spring 春回大地 I am coming, little maiden, With the pleasant sunshine laden, With the honey for the bee, With the blossom for the tree. 我来啦&#xff0c;小姑娘 满载着欣悦的阳光 蜂儿有蜜酿 树儿有花绽放 Every little stream is bright, All …

Windows 安装 Linux子系统,并为子系统设置图形化界面

安装WSL 在控制面板中打开下面的选项&#xff1a; 执行下面的命令&#xff0c;更新到WSL2版本&#xff0c;并在以后创建子系统的时候默认采用WSL2的版本&#xff1a; wsl --update wsl --set-default-version 2在Window上安装连接工具&#xff1a; 在Window上下载VcXsrv&…

备战蓝桥杯---动态规划之经典背包问题

看题&#xff1a; 我们令f[i][j]为前i个物品放满容量为j的背包的最大价值。 f[i][j]max(f[i-1][j],f[i-1][j-c[i]]w[i]); 我们开始全副成负无穷。f[0][0]0;最后循环最后一行求max; 负无穷&#xff1a;0xc0c0c0c0;正无穷&#xff1a;0x3f3f3f3f 下面是v12,n6的图示&#xff…

深搜问题:素数圆环

祝大家新年快乐&#xff0c;今天给大家带来龙年第一道题 时间限制&#xff1a;1秒 内存限制&#xff1a;128M 题目描述 如图所示为一个由n个圆圈构成的圆环。将自然数1&#xff0c;2&#xff0c;...&#xff0c;n放入圆圈内&#xff0c;并且要求任意两个相邻的圆圈内…

自动化UI,API和DevOps测试架构设计与实现

自动化测试是软件开发过程中的重要环节&#xff0c;它可以提高测试效率、减少人工测试的工作量。本文将介绍自动化测试架构的设计原则和实现方法&#xff0c;以帮助读者理解如何构建一个可靠、可扩展和易于维护的自动化测试框架。 1. 什么是自动化测试&#xff1f; - 解释了…

二阶系统的迹-行列式平面方法(trace-determinant methods for 2nd order system)

让我们再次考虑二阶线性系统 d Y d t A Y \frac{d\mathbf{Y}}{dt}A\mathbf{Y} dtdY​AY 我们已经知道&#xff0c;分析这种二阶系统。最主要的是注意它的特征值情形。 &#xff08;此处没有重根的情形&#xff0c;所有是partial&#xff09; 而特征值&#xff0c;也就是系…

Electron+Vue实现仿网易云音乐实战

前言 这个项目是我跟着官方文档的那个Electron入门教程大致跑了一遍,了解了下Electron开发流程之后的实战项目,所以中间应该是会有很多写法不是很规范,安全性有可能也没考虑到,可实现的各种api也不是很了解,适合初学者。 必须感谢 https://github.com/Binaryify/NeteaseC…

Python 数据可视化之山脊线图 Ridgeline Plots

文章目录 一、前言二、主要内容三、总结 &#x1f349; CSDN 叶庭云&#xff1a;https://yetingyun.blog.csdn.net/ 一、前言 JoyPy 是一个基于 matplotlib pandas 的单功能 Python 包&#xff0c;它的唯一目的是绘制山脊线图 Joyplots&#xff08;也称为 Ridgeline Plots&…