2_RabbitMQ工作模式_Work queues_Publish/Subscribe_Routing_Topics_HeaderRpc

文章目录

  • 2_RabbitMQ工作模式
    • 1.Work queues
    • 2.Publish/Subscribe
      • 1.工作模式
      • 2.代码
        • 1.生产者
          • 1.指定消息队列相关消息
          • 2.建立连接&绑定队列
          • 3.发送消息
          • 完整代码:
        • 2.消费者
          • 1.指定消息队列相关消息
          • 2.建立连接&绑定队列
          • 3.实现消费方法&监听消息
          • 完整代码
      • 3.小结
    • 3.Routing
      • 1.工作模式
      • 2.代码
        • 1.生产者
        • 2.消费者
      • 3.小结
    • 4.Topics
      • 1.工作模式
      • 2.代码
        • 1.生产者
        • 2.消费者
      • 3.小结
    • 5.Header&Rpc
      • header模式
        • 1)生产者
        • 2)发送邮件消费者
      • Rpc

2_RabbitMQ工作模式

前置知识:工作模式指的是某条消息该怎么分发到队列?分发策略

1.Work queues

image-20200710121343883

work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息。

应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

测试:

1、使用入门程序,启动多个消费者。

2、生产者发送多个消息。

结果

1、一条消息只会被一个消费者接收;

2、rabbit采用轮询的方式将消息是平均发送给消费者的;

3、消费者在处理完某条消息后,才会收到下一条消息。

2.Publish/Subscribe

1.工作模式

image-20200710121452095

发布订阅模式:

1、每个消费者监听自己的队列。

2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

此交换机的概念类似计算机网络中的交换机,接受包然后分发下去

2.代码

案例:

用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法 。

1.生产者

声明exchange_routing_inform交换机。

声明两个队列并且绑定到此交换机,绑定时需要指定routingkey

发送消息时需要指定routingkey

1.指定消息队列相关消息
//队列名称
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");
2.建立连接&绑定队列
Connection connection = null;
Channel channel = null;//建立新连接
connection = connectionFactory.newConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
channel = connection.createChannel();
//声明队列,如果队列在mq 中没有则要创建
//参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/
channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
//声明一个交换机
//参数:String exchange, String type
/*** 参数明细:* 1、交换机的名称* 2、交换机的类型* fanout:对应的rabbitmq的工作模式是 publish/subscribe* direct:对应的Routing	工作模式* topic:对应的Topics工作模式* headers: 对应的headers工作模式*/
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
//进行交换机和队列绑定
//参数:String queue, String exchange, String routingKey
/*** 参数明细:* 1、queue 队列名称* 2、exchange 交换机名称* 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串*/
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
3.发送消息
//发送消息
//参数:String exchange, String routingKey, BasicProperties props, byte[] body
/*** 参数明细:* 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")* 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称* 3、props,消息的属性* 4、body,消息内容*/
for(int i=0;i<20;i++){//消息内容String message = "send inform message to user "+(i+1);channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,message.getBytes());System.out.println("send to mq "+message);
}
完整代码:
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Administrator* @version 1.0* @create 2018-06-17 18:10**/
public class Producer02_publish {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";public static void main(String[] args) {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新连接connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成channel = connection.createChannel();//声明队列,如果队列在mq 中没有则要创建//参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);//声明一个交换机//参数:String exchange, String type/*** 参数明细:* 1、交换机的名称* 2、交换机的类型* fanout:对应的rabbitmq的工作模式是 publish/subscribe* direct:对应的Routing	工作模式* topic:对应的Topics工作模式* headers: 对应的headers工作模式*/channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);//进行交换机和队列绑定//参数:String queue, String exchange, String routingKey/*** 参数明细:* 1、queue 队列名称* 2、exchange 交换机名称* 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");//发送消息//参数:String exchange, String routingKey, BasicProperties props, byte[] body/*** 参数明细:* 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")* 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称* 3、props,消息的属性* 4、body,消息内容*/for(int i=0;i<20;i++){//消息内容String message = "send inform message to user "+(i+1);channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,message.getBytes());System.out.println("send to mq "+message);}} catch (Exception e) {e.printStackTrace();} finally {//关闭连接//先关闭通道try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}

2.消费者

1.指定消息队列相关消息
//队列名称
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";//通过连接工厂创建新的连接和mq建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);//端口
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
connectionFactory.setVirtualHost("/");
2.建立连接&绑定队列
Connection connection = null;
Channel channel = null;//建立新连接
connection = connectionFactory.newConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
channel = connection.createChannel();
//声明队列,如果队列在mq 中没有则要创建
//参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/
channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
//声明一个交换机
//参数:String exchange, String type
/*** 参数明细:* 1、交换机的名称* 2、交换机的类型* fanout:对应的rabbitmq的工作模式是 publish/subscribe* direct:对应的Routing	工作模式* topic:对应的Topics工作模式* headers: 对应的headers工作模式*/
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
//进行交换机和队列绑定
//参数:String queue, String exchange, String routingKey
/*** 参数明细:* 1、queue 队列名称* 2、exchange 交换机名称* 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串*/
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
3.实现消费方法&监听消息
//实现消费方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 当接收到消息后此方法将被调用* @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();//消息内容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}
};
//监听队列
//参数:String queue, boolean autoAck, Consumer callback
/*** 参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复* 3、callback,消费方法,当消费者接收到消息要执行的方法*/
channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
完整代码
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Administrator* @version 1.0* @create 2018-06-17 18:22**/
public class Consumer02_subscribe_email {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";public static void main(String[] args) throws IOException, TimeoutException {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");//建立新连接Connection connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);//声明一个交换机//参数:String exchange, String type/*** 参数明细:* 1、交换机的名称* 2、交换机的类型* fanout:对应的rabbitmq的工作模式是 publish/subscribe* direct:对应的Routing   工作模式* topic:对应的Topics工作模式* headers: 对应的headers工作模式*/channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);//进行交换机和队列绑定//参数:String queue, String exchange, String routingKey/*** 参数明细:* 1、queue 队列名称* 2、exchange 交换机名称* 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "");//实现消费方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 当接收到消息后此方法将被调用* @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();//消息内容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//监听队列//参数:String queue, boolean autoAck, Consumer callback/*** 参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复* 3、callback,消费方法,当消费者接收到消息要执行的方法*/channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);}
}

3.小结

image-20200710124024869

3.Routing

1.工作模式

image-20200710123545688

路由模式

1、每个消费者监听自己的队列,并且设置routingkey。

2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。

2.代码

1.生产者

全部代码:

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Administrator* @version 1.0* @create 2018-06-17 19:23**/
public class Producer03_routing {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";private static final String ROUTINGKEY_EMAIL="inform_email";private static final String ROUTINGKEY_SMS="inform_sms";public static void main(String[] args) {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新连接connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成channel = connection.createChannel();//声明队列,如果队列在mq 中没有则要创建//参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);//声明一个交换机//参数:String exchange, String type/*** 参数明细:* 1、交换机的名称* 2、交换机的类型* fanout:对应的rabbitmq的工作模式是 publish/subscribe* direct:对应的Routing   工作模式* topic:对应的Topics工作模式* headers: 对应的headers工作模式*/channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);//进行交换机和队列绑定//参数:String queue, String exchange, String routingKey/*** 参数明细:* 1、queue 队列名称* 2、exchange 交换机名称* 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,"inform");channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS);channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,"inform");//发送消息//参数:String exchange, String routingKey, BasicProperties props, byte[] body/*** 参数明细:* 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")* 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称* 3、props,消息的属性* 4、body,消息内容*//* for(int i=0;i<5;i++){//发送消息的时候指定routingKeyString message = "send email inform message to user";channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL,null,message.getBytes());System.out.println("send to mq "+message);}for(int i=0;i<5;i++){//发送消息的时候指定routingKeyString message = "send sms inform message to user";channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS,null,message.getBytes());System.out.println("send to mq "+message);}*/for(int i=0;i<5;i++){//发送消息的时候指定routingKeyString message = "send inform message to user";channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform",null,message.getBytes());System.out.println("send to mq "+message);}} catch (Exception e) {e.printStackTrace();} finally {//关闭连接//先关闭通道try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}

2.消费者

全部代码:

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Administrator* @version 1.0* @create 2018-06-17 18:22**/
public class Consumer03_routing_email {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";private static final String ROUTINGKEY_EMAIL="inform_email";public static void main(String[] args) throws IOException, TimeoutException {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");//建立新连接Connection connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);//声明一个交换机//参数:String exchange, String type/*** 参数明细:* 1、交换机的名称* 2、交换机的类型* fanout:对应的rabbitmq的工作模式是 publish/subscribe* direct:对应的Routing   工作模式* topic:对应的Topics工作模式* headers: 对应的headers工作模式*/channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);//进行交换机和队列绑定//参数:String queue, String exchange, String routingKey/*** 参数明细:* 1、queue 队列名称* 2、exchange 交换机名称* 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);//实现消费方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 当接收到消息后此方法将被调用* @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();//消息内容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//监听队列//参数:String queue, boolean autoAck, Consumer callback/*** 参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复* 3、callback,消费方法,当消费者接收到消息要执行的方法*/channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);}
}

3.小结

image-20200710123942233

4.Topics

1.工作模式

image-20200710124003092

2.代码

1.生产者

声明exchange_routing_inform交换机。

声明两个队列并且绑定到此交换机,绑定时需要指定routingkey

发送消息时需要指定routingkey

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Administrator* @version 1.0* @create 2018-06-17 19:23**/
public class Producer04_topics {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";private static final String ROUTINGKEY_EMAIL="inform.#.email.#";private static final String ROUTINGKEY_SMS="inform.#.sms.#";public static void main(String[] args) {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新连接connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成channel = connection.createChannel();//声明队列,如果队列在mq 中没有则要创建//参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);//声明一个交换机//参数:String exchange, String type/*** 参数明细:* 1、交换机的名称* 2、交换机的类型* fanout:对应的rabbitmq的工作模式是 publish/subscribe* direct:对应的Routing   工作模式* topic:对应的Topics工作模式* headers: 对应的headers工作模式*/channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);//进行交换机和队列绑定//参数:String queue, String exchange, String routingKey/*** 参数明细:* 1、queue 队列名称* 2、exchange 交换机名称* 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);//发送消息//参数:String exchange, String routingKey, BasicProperties props, byte[] body/*** 参数明细:* 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")* 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称* 3、props,消息的属性* 4、body,消息内容*/for(int i=0;i<5;i++){//发送消息的时候指定routingKeyString message = "send email inform message to user";channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.email",null,message.getBytes());System.out.println("send to mq "+message);}for(int i=0;i<5;i++){//发送消息的时候指定routingKeyString message = "send sms inform message to user";channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms",null,message.getBytes());System.out.println("send to mq "+message);}for(int i=0;i<5;i++){//发送消息的时候指定routingKeyString message = "send sms and email inform message to user";channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms.email",null,message.getBytes());System.out.println("send to mq "+message);}} catch (Exception e) {e.printStackTrace();} finally {//关闭连接//先关闭通道try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}

2.消费者

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Administrator* @version 1.0* @create 2018-06-17 18:22**/
public class Consumer04_topics_email {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";private static final String ROUTINGKEY_EMAIL="inform.#.email.#";public static void main(String[] args) throws IOException, TimeoutException {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");//建立新连接Connection connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);//声明一个交换机//参数:String exchange, String type/*** 参数明细:* 1、交换机的名称* 2、交换机的类型* fanout:对应的rabbitmq的工作模式是 publish/subscribe* direct:对应的Routing   工作模式* topic:对应的Topics工作模式* headers: 对应的headers工作模式*/channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);//进行交换机和队列绑定//参数:String queue, String exchange, String routingKey/*** 参数明细:* 1、queue 队列名称* 2、exchange 交换机名称* 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);//实现消费方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 当接收到消息后此方法将被调用* @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();//消息内容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//监听队列//参数:String queue, boolean autoAck, Consumer callback/*** 参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复* 3、callback,消费方法,当消费者接收到消息要执行的方法*/channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);}
}

3.小结

image-20200710124424925

5.Header&Rpc

header模式

header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配

队列。

案例

根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种

通知类型都接收的则两种通知都有效。

1)生产者

队列与交换机绑定的代码与之前不同,如下:

image-20200710125647022

通知:

image-20200710125727847

2)发送邮件消费者

image-20200710125744185

Rpc

image-20200710130016749

RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:

1、客户端即是生产者就是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。

2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果

3、服务端将RPC方法 的结果发送到RPC响应队列

4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/482661.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

1000亿美元!英特尔要在美国建世界最大芯片厂,美520 亿美元芯片法案接近敲定...

来源&#xff1a;新智元编辑&#xff1a;桃子 拉燕 时光1000亿美元芯片计划&#xff0c;要重振英特尔往日雄风&#xff01;路透称&#xff0c;英特尔将投资200亿美元建设2家芯片制造工厂&#xff0c;并计划最终投资多达1000亿美元。新工厂的建设将于今年晚些时候开始&#xff0…

Vue 组件间通信六种方式

前言 组件是 vue.js最强大的功能之一&#xff0c;而组件实例的作用域是相互独立的&#xff0c;这就意味着不同组件之间的数据无法相互引用。一般来说&#xff0c;组件可以有以下几种关系&#xff1a; 如上图所示&#xff0c;A 和 B、B 和 C、B 和 D 都是父子关系&#xff0c;C …

vue-day01-vue模板语法

文章目录Vue 是什么&#xff1f;使用Vue将helloworld 渲染到页面上指令v-cloakv-textv-htmlv-pre**v-once**双向数据绑定v-modelmvvmv-onv-on事件函数中传入参数事件修饰符按键修饰符自定义按键修饰符别名小案例-简单计算器v-bind绑定对象绑定class绑定对象和绑定数组 的区别绑…

IEEE Spectrum调查:AI 的 6 种最坏情况

来源&#xff1a;AI科技评论编译&#xff1a;辛西娅审核&#xff1a;维克多对于人类社会&#xff0c;人工智能&#xff08;AI&#xff09;带来的最大威胁是什么&#xff1f;好莱坞科幻电影的“想象”提供了答案&#xff1a;它逐渐进化&#xff0c;获得人类思考能力&#xff0c;…

day24 反射\元类

反射 reflect # 什么是反射, 其实是反省,自省的意思,反射指的是一个对象应该具备,可以检测,修改,增加自身属性的能力. # 反射就是通过字符串操作属性,涉及的四个函数,这四个函数就是普通的内置函数,没有双下划綫,与print等等没有区别. 案例 hasattr getattr setattr delattr p …

212页PPT详解MEMS微传感器的工作原理(深入全面!)

来源&#xff1a;传感器专家网本文是关于MEMS微传感器的工作原理最全面的内容&#xff0c;分为两部分&#xff0c;共计212页PPT内容。主要讲解了MEMS微传感器的概念、分类&#xff0c;基本敏感原理介绍&#xff0c;MEMS微传感器实例、MEMS微执行器分类、基本致动方式介绍、微执…

C++开发WPF,开发环境配置

C开发WPF&#xff0c;开发环境配置 操作系统:Windows XP SP2, Windwos Vista开发工具:Visual Studio 2005&#xff0c;Expression BlendSDK:.NET Framework 3.0或以上不需要其它的了&#xff0c;比C#开发WPF少了一些。但是需要自己手工打造一些代码&#xff0c;也有不少乐趣在里…

Lucene-01 全文检索基本介绍

文章目录课程计划什么是全文检索数据分类结构化数据搜索非结构化数据查询方法如何实现全文检索全文检索的应用场景Lucene实现全文检索的流程索引和搜索流程图创建索引获得原始文档创建文档对象分析文档创建索引查询索引用户查询接口创建查询执行查询渲染结果全文检索技术Lucene…

为何生命进化的方向是衰老,而不是永生?

来源&#xff1a;科学的乐园永生似乎是全世界各种文化里都在追求的一种状态&#xff0c;为此古代的人们发展出了宗教&#xff0c;用来寄托死亡带来的遗憾。而人类也在想尽一切办法抑制衰老&#xff0c;各种护肤品、保养品相继问世。当然人类在这条追求永生的道路上也吃了很多苦…

NPOI “发现 中的部分内容有问题,是否要恢复此工作薄的内容?如果信任此工作薄的来源。。。”的问题的解决方法...

网上说的方法是调整Sheet可见和顺序&#xff1a;https://blog.csdn.net/hulihui/article/details/21196951 stackoverflow给出的解释是&#xff1a;单元格存储数字&#xff0c;字符串格式不对应造成的&#xff0c;https://stackoverflow.com/questions/15675710/opening-excel-…

vue-day02-vue常用特性

文章目录Vue常用特性表单基本操作表单修饰符自定义指令Vue.directive 注册全局指令Vue.directive 注册全局指令 带参数自定义指令局部指令计算属性 computed侦听器 watch过滤器过滤器中传递参数生命周期常用的 钩子函数数组变异方法替换数组动态数组响应式数据图书列表案例1、 …

2022年,哪些科技趋势将持续改变世界?这里有一份来自百度研究院的预测

来源&#xff1a;数学中国编辑部弹指之间&#xff0c;2021 年已经远去。这一年&#xff0c;新冠疫情全球经济和社会生活带来诸多挑战&#xff0c;同时&#xff0c;科学技术的力量得以持续显现。技术进步与产业发展的速度进一步加快&#xff0c;数字技术、智能技术为人们的生活带…

CSS每日学习笔记(1)

7.30.2019 1.CSS 文本属性 属性 描述 color 设置文本颜色 direction 设置文本方向。 line-height 设置行高。 letter-spacing 设置字符间距。 text-align 对齐元素中的文本。 text-decoration 向文本添加修饰。 text-indent 缩进元素中文本的首行。 text-shado…

vue-day03-vue组件化开发

文章目录组件组件注册全局注册组件基础用组件注意事项局部注册Vue 调试工具Vue组件之间传值父组件向子组件传值子组件向父组件传值兄弟之间的传递组件插槽匿名插槽具名插槽作用域插槽购物车案例1. 实现组件化布局2、实现 标题和结算功能组件3. 实现列表组件删除功能4. 实现组件…

智源发布《人工智能的认知神经基础白皮书》,一览“AI×脑科学”前沿

图. 智源研究院《人工智能的认知神经基础白皮书》&#xff08;2021年&#xff09;来源&#xff1a;智源研究院智源研究院发布 2021 年度《人工智能的认知神经基础白皮书》&#xff0c;兼具专业性与科普性&#xff0c;是人工智能学者探寻“AI脑科学”交叉学科研发创新的导览之作…

解决设置了display:none的元素,会先展示再隐藏

问题&#xff1a;元素明明设置了display:none&#xff0c;但是在刷新页面的时候却会先显示了出来&#xff0c;然后才会隐藏&#xff0c;实现display:none 原因&#xff1a;由于元素渲染的时候&#xff0c;样式还没有应用上去&#xff0c;导致的 解决办法&#xff1a;使用内联样…

Java : Hibernate 动态+分页+自定义字段+自定义实体类查询

// 组合查询public List<ListBookDTO> listSetDSL(PublishingHouse publishingHouse,Integer minDiscount, Integer maxDiscount, Integer minStocks, Integer maxStocks, Integer page, Integer pageSize) { CriteriaBuilder builder em.getCriteriaBuilder(); /…

VR视觉健康标准在穗发布 专家:VR使用不要超过45分钟

来源&#xff1a;VR每日必看近期&#xff0c;“元宇宙”新兴概念备受关注&#xff0c;虚拟现实&#xff08;下称“VR”&#xff09;技术也被国内外媒体评为“第四次工业革命的钥匙之一”。但是&#xff0c;有不少人对VR眼镜等设备感到担忧&#xff1a;使用它会损害视力吗&#…

vue-day04-vue前端交互

文章目录接口调用方式异步promise基于Promise发送Ajax请求Promise 基本API实例方法.then().catch().finally()静态方法.all().race()fetch概览fetch请求参数&#xff08;图片记录&#xff09;fetch API 中的 HTTP 请求fetchAPI 中 响应格式axiosaxios基础用法axios的响应结果ax…

系统上线日期被老外逼得延期了!

在我自己的印象中&#xff0c;一直以来&#xff0c;以为老外做事是比较严禁的&#xff0c;而且是比较高效的&#xff01;没有想到&#xff0c;居然完全不是这么回事&#xff01; a) 不听从中方任何建议&#xff0c;一意孤行&#xff01; b) 事实证明他们的错误之后&#xff0c;…