RabbitMQ消息队列

目录

网址:

一、项目准备

1.导入依赖

2.抽取工具类

配置的属性在哪里呢

二、代码编写

1.简单模式

生产者

消费者

2.Work queues工作队列模式

生产者

消费者1

 消费者2

3.Publish/Subscribe发布与订阅模式

生产者

消费者1

消费者2

4.Routing路由模式

生产者

消费者1

消费者2

5.Topics通配符模式

生产者

消费者1

消费者2

三、总结


网址:

RabbitMQ官方地址:RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQhttp://www.rabbitmq.com/

官网对应模式介绍:RabbitMQ Tutorials — RabbitMQhttps://www.rabbitmq.com/getstarted.html

一、项目准备

1.导入依赖

在maven工程的pom文件中添加依赖。

    <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency>

2.抽取工具类

不论是生产者还是消费者都要创建连接工厂和创建连接这连个步骤。

        //1. 创建连接工厂(设置RabbitMQ的连接参数);ConnectionFactory connectionFactory = new ConnectionFactory();//主机;默认localhostconnectionFactory.setHost("localhost");//连接端口;默认5672connectionFactory.setPort(5672);//虚拟主机;默认/connectionFactory.setVirtualHost("/yh");//用户名;默认guestconnectionFactory.setUsername("yh");//密码;默认guestconnectionFactory.setPassword("yh");//2. 创建连接;Connection connection = connectionFactory.newConnection();

为简省代码可抽取一个工具类RabbitGetConnection.java。 

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class RabbitGetConnection {public static Connection getConnection(){//1. 创建连接工厂(设置RabbitMQ的连接参数);ConnectionFactory connectionFactory = new ConnectionFactory();//主机;默认localhostconnectionFactory.setHost("localhost");//连接端口;默认5672connectionFactory.setPort(5672);//虚拟主机;默认/connectionFactory.setVirtualHost("/yh");//用户名;默认guestconnectionFactory.setUsername("admin");//密码;默认guestconnectionFactory.setPassword("admin");//2. 创建连接;Connection connection = null;try {connection = connectionFactory.newConnection();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {throw new RuntimeException(e);}return connection;}
}

配置的属性在哪里呢

摁住Ctrl键单击newConnection()方法查看源码

 

二、代码编写

1.简单模式

为了更加直观,前面没有使用抽取出来的工具类。

生产者

package cn.yh.rabbitmq.simple;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** 简单模式:发送消息*/
public class Producer {static final String QUEUE_NAME = "simple_queue";public static void main(String[] args) throws Exception {//1. 创建连接工厂(设置RabbitMQ的连接参数);ConnectionFactory connectionFactory = new ConnectionFactory();//主机;默认localhostconnectionFactory.setHost("localhost");//连接端口;默认5672connectionFactory.setPort(5672);//虚拟主机;默认/connectionFactory.setVirtualHost("/yh");//用户名;默认guestconnectionFactory.setUsername("yh");//密码;默认guestconnectionFactory.setPassword("yh");//2. 创建连接;Connection connection = connectionFactory.newConnection();//3. 创建频道;Channel channel = connection.createChannel();//4. 声明队列;/*** 参数1:队列名称* 参数2:是否定义持久化队列(消息会持久化保存在服务器上)* 参数3:是否独占本连接* 参数4:是否在不使用的时候队列自动删除* 参数5:其它参数*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);//5. 发送消息;String message = "你好!小兔纸。";/*** 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机)* 参数2:路由key,简单模式中可以使用队列名称* 参数3:消息其它属性* 参数4:消息内容*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("已发送消息:" + message);//6. 关闭资源channel.close();connection.close();}
}

消费者

package cn.yh.rabbitmq.simple;import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;/*** 简单模式;消费者接收消息*/
public class Consumer {public static void main(String[] args) throws Exception {//1. 创建连接工厂;//2. 创建连接;(抽取一个获取连接的工具类)Connection connection = ConnectionUtil.getConnection();//3. 创建频道;Channel channel = connection.createChannel();//4. 声明队列;/*** 参数1:队列名称* 参数2:是否定义持久化队列(消息会持久化保存在服务器上)* 参数3:是否独占本连接* 参数4:是否在不使用的时候队列自动删除* 参数5:其它参数*/channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);//5. 创建消费者(接收消息并处理消息);DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key为:" + envelope.getRoutingKey());//交换机System.out.println("交换机为:" + envelope.getExchange());//消息idSystem.out.println("消息id为:" + envelope.getDeliveryTag());//接收到的消息System.out.println("接收到的消息为:" + new String(body, "utf-8"));}};//6. 监听队列/*** 参数1:队列名* 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除;* 如果设置为false则需要手动确认* 参数3:消费者*/channel.basicConsume(Producer.QUEUE_NAME, true, defaultConsumer);}
}

2.Work queues工作队列模式

生产者

package cn.yh.rabbitmq.work;
import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);for (int i = 1; i <= 30; i++) {// 发送信息String message = "你好;小兔子!work模式--" + i;/*** 参数1:交换机名称,如果没有指定则使用默认Default Exchage* 参数2:路由key,简单模式可以传递队列名称* 参数3:消息其它属性* 参数4:消息内容*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("已发送消息:" + message);}// 关闭资源channel.close();connection.close();}
}

消费者1

package cn.yh.rabbitmq.work;
import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);//一次只能接收并处理一个消息channel.basicQos(1);//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {try {//路由keySystem.out.println("路由key为:" + envelope.getRoutingKey());//交换机System.out.println("交换机为:" + envelope.getExchange());//消息idSystem.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-
8"));Thread.sleep(1000);//确认消息channel.basicAck(envelope.getDeliveryTag(), false);} catch (InterruptedException e) {e.printStackTrace();}}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(Producer.QUEUE_NAME, false, consumer);}
}

 消费者2

package cn.yh.rabbitmq.work;
import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);//一次只能接收并处理一个消息channel.basicQos(1);//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {try {//路由keySystem.out.println("路由key为:" + envelope.getRoutingKey());//交换机System.out.println("交换机为:" + envelope.getExchange());//消息idSystem.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));Thread.sleep(1000);//确认消息channel.basicAck(envelope.getDeliveryTag(), false);} catch (InterruptedException e) {e.printStackTrace();}}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(Producer.QUEUE_NAME, false, consumer);}
}

3.Publish/Subscribe发布与订阅模式

发布与订阅模式:一个消息可以被多个消费者接收;一个消费者对于的队列,该队列只能被一个消费者监听。使用了订阅模式中交换机类型为:广播。

生产者

package cn.yh.rabbitmq.ps;
import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;/*** 发布与订阅使用的交换机类型为:fanout*/
public class Producer {//交换机名称static final String FANOUT_EXCHAGE = "fanout_exchange";//队列名称static final String FANOUT_QUEUE_1 = "fanout_queue_1";//队列名称static final String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();/*** 声明交换机* 参数1:交换机名称* 参数2:交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);        //队列绑定交换机channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHAGE, "");channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHAGE, "");for (int i = 1; i <= 10; i++) {// 发送信息String message = "你好;小兔子!发布订阅模式--" + i;/*** 参数1:交换机名称,如果没有指定则使用默认Default Exchage* 参数2:路由key,简单模式可以传递队列名称* 参数3:消息其它属性* 参数4:消息内容*/channel.basicPublish(FANOUT_EXCHAGE, "", null, message.getBytes());System.out.println("已发送消息:" + message);}// 关闭资源channel.close();connection.close();}
}

消费者1

package cn.yh.rabbitmq.ps;
import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数         */channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);//队列绑定交换机channel.queueBind(Producer.FANOUT_QUEUE_1, Producer.FANOUT_EXCHAGE, "");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key为:" + envelope.getRoutingKey());//交换机System.out.println("交换机为:" + envelope.getExchange());//消息idSystem.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-
8"));}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer);}
}

消费者2

package cn.yh.rabbitmq.ps;
import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);//队列绑定交换机channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHAGE, "");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key为:" + envelope.getRoutingKey());//交换机System.out.println("交换机为:" + envelope.getExchange());//消息idSystem.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(Producer.FANOUT_QUEUE_2, true, consumer);}
}

4.Routing路由模式

Routing 路由模式要求队列绑定到交换机的时候指定路由key;消费发送时候需要携带路由key;只有消息的路由key与队列路由key完全一致才能让该队列接收到消息。

生产者

import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;/*** 路由模式的交换机类型为:direct*/
public class Producer {//交换机名称static final String DIRECT_EXCHAGE = "direct_exchange";//队列名称static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";//队列名称static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();/*** 声明交换机* 参数1:交换机名称* 参数2:交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);//队列绑定交换机channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, "insert");channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE, "update");// 发送信息String message = "新增了商品。路由模式;routing key 为 insert " ;/*** 参数1:交换机名称,如果没有指定则使用默认Default Exchage* 参数2:路由key,简单模式可以传递队列名称* 参数3:消息其它属性* 参数4:消息内容*/channel.basicPublish(DIRECT_EXCHAGE, "insert", null, message.getBytes());System.out.println("已发送消息:" + message);// 发送信息message = "修改了商品。路由模式;routing key 为 update" ;/*** 参数1:交换机名称,如果没有指定则使用默认Default Exchage* 参数2:路由key,简单模式可以传递队列名称* 参数3:消息其它属性* 参数4:消息内容*/channel.basicPublish(DIRECT_EXCHAGE, "update", null, message.getBytes());System.out.println("已发送消息:" + message);// 关闭资源channel.close();connection.close();}
}

消费者1

package cn.yh.rabbitmq.routing;import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(Producer.DIRECT_QUEUE_INSERT, true, false, false, null);//队列绑定交换机channel.queueBind(Producer.DIRECT_QUEUE_INSERT, Producer.DIRECT_EXCHAGE, "insert");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key为:" + envelope.getRoutingKey());//交换机System.out.println("交换机为:" + envelope.getExchange());//消息idSystem.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(Producer.DIRECT_QUEUE_INSERT, true, consumer);}
}

消费者2

package cn.yh.rabbitmq.routing;import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(Producer.DIRECT_QUEUE_UPDATE, true, false, false, null);//队列绑定交换机channel.queueBind(Producer.DIRECT_QUEUE_UPDATE, Producer.DIRECT_EXCHAGE, "update");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key为:" + envelope.getRoutingKey());//交换机System.out.println("交换机为:" + envelope.getExchange());//消息idSystem.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(Producer.DIRECT_QUEUE_UPDATE, true, consumer);}
}

5.Topics通配符模式

Topic主题模式可以实现 Publish/Subscribe发布与订阅模式 和 Routing路由模式 的功能;只是Topic在配置routingkey 的时候可以使用通配符,显得更加灵活。

Topics通配符模式:可以根据路由key将消息传递到对应路由key的队列;队列绑定到交换机的路由key可以有多个;通配符模式中路由key可以使用 *# ;使用了通配符模式之后对于路由Key的配置更加灵活。

生产者

package cn.yh.rabbitmq.topic;
import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/*** 通配符Topic的交换机类型为:topic*/
public class Producer {//交换机名称static final String TOPIC_EXCHAGE = "topic_exchange";//队列名称static final String TOPIC_QUEUE_1 = "topic_queue_1";//队列名称static final String TOPIC_QUEUE_2 = "topic_queue_2";public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();/*** 声明交换机* 参数1:交换机名称* 参数2:交换机类型,fanout、topic、topic、headers*/channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);// 发送信息String message = "新增了商品。Topic模式;routing key 为 item.insert " ;channel.basicPublish(TOPIC_EXCHAGE, "item.insert", null, message.getBytes());System.out.println("已发送消息:" + message);// 发送信息message = "修改了商品。Topic模式;routing key 为 item.update" ;channel.basicPublish(TOPIC_EXCHAGE, "item.update", null, message.getBytes());System.out.println("已发送消息:" + message);// 发送信息message = "删除了商品。Topic模式;routing key 为 item.delete" ;channel.basicPublish(TOPIC_EXCHAGE, "item.delete", null, message.getBytes());System.out.println("已发送消息:" + message);// 关闭资源channel.close();connection.close();}
}

消费者1

package cn.yh.rabbitmq.topic;
import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(Producer.TOPIC_QUEUE_1, true, false, false, null);//队列绑定交换机channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, 
"item.update");channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, 
"item.delete");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key为:" + envelope.getRoutingKey());//交换机System.out.println("交换机为:" + envelope.getExchange());//消息idSystem.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(Producer.TOPIC_QUEUE_1, true, consumer);}
}

消费者2

package cn.yh.rabbitmq.topic;
import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(Producer.TOPIC_QUEUE_2, true, false, false, null);//队列绑定交换机channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHAGE, "item.*");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key为:" + envelope.getRoutingKey());//交换机System.out.println("交换机为:" + envelope.getExchange());//消息idSystem.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(Producer.TOPIC_QUEUE_2, true, consumer);}
}

三、总结

目标:对比总结RabbitMQ的5种模式特征

  • 不直接Exchange交换机(默认交换机)

    1. simple简单模式: 一个生产者、一个消费者,生产者生产消息到一个队列被一个消费者接收

    2. work Queue工作队列模式: 一个生产者、多个消费者(竞争关系),生产者发送消息到一个队列中,可以被多个消费者监听该队列;一个消息只能被一个消费者接收,消费者之间是竞争关系

  • 使用Exchange交换机;订阅模式(交换机:广播fanout、定向direct、通配符topic)

    1. 发布与订阅模式:使用了fanout广播类型的交换机,可以将一个消息发送到所有绑定了该交换机的队列

    2. 路由模式:使用了direct定向类型的交换机,消息会携带路由key,交换机根据消息的路由key与队列的路由key进行对比,一致的话那么该队列可以接收到消息

    3. 通配符模式:使用了topic通配符类型的交换机,消息会携带路由key,交换机根据消息的路由key与队列的路由key(*, #)进行对比,匹配的话那么该队列可以接收到消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/25212.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

git【潦草学习】

初始配置git 查询版本号 初次使用git前配置用户名与邮箱地址 git config --global user.name "your name" git config --global user.email "your email" git config -l 发现最后两行多出了用户名和邮箱&#xff0c;说明配置成功

【雕爷学编程】Arduino动手做(184)---快餐盒盖,极低成本搭建机器人实验平台3

吃完快餐粥&#xff0c;除了粥的味道不错之外&#xff0c;我对个快餐盒的圆盖子产生了兴趣&#xff0c;能否做个极低成本的简易机器人呢&#xff1f;也许只需要二十元左右 知识点&#xff1a;轮子&#xff08;wheel&#xff09; 中国词语。是用不同材料制成的圆形滚动物体。简…

解决Map修改key的问题

需求 现在返回json数据带有分页的数据&#xff0c;将返回data属性数据变更为content&#xff0c;数据不变&#xff0c;key发生变化 实现1&#xff0c;源数据比较复杂&#xff0c;组装数据比较麻烦 说明&#xff1a;如果使用这种方式完成需求&#xff0c;需要创建对象&#xff0…

VLAN原理+配置

目录 一&#xff0c; 以太网二层交换机 二&#xff0c;三层架构&#xff1a; 三&#xff0c;VLAN配置思路 1.创建vlan 2.接口划入vlan 3.trunk干道 4.vlan间路由器 5.DHCP池塘配置 四&#xff0c;华为VLAN部分的接口模式讲解&#xff1a; 五&#xff0c;华为VLAN部分的…

mysql二进制方式升级8.0.34

一、概述 mysql8.0.33 存在如下高危漏洞&#xff0c;需要通过升级版本修复漏洞 Oracle MySQL Cluster 安全漏洞(CVE-2023-0361) mysql/8.0.33 Apache Skywalking <8.3 SQL注入漏洞 二、查看mysql版本及安装包信息 [rootlocalhost mysql]# mysql -V mysql Ver 8.0.33 fo…

Eureka增加账号密码认证登录

一、业务背景 注册中心Eureka在微服务开发中经常使用到&#xff0c;用来管理发布的微服务&#xff0c;供前端或者外部调用。但是如果放到生产环境&#xff0c;我们直接通过URL访问的话&#xff0c;这显然是不安全的。 所以需要给注册中心加上登录认证。 通过账号和密码认证进行…

【计算机网络】socket编程

文章目录 1. 网络通信的理解2.进程PID可以取代端口号吗&#xff1f;3. 认识TCP协议4. 认识 UDP协议5. socket编程接口udp_server.hpp的代码解析socket——创建 socket 文件描述符Initserver——初始化1.创建套接字接口&#xff0c;打开网络文件bind——绑定的使用 2.给服务器指…

[webpack] 基本配置 (一)

文章目录 1.基本介绍2.功能介绍3.简单使用3.1 文件目录和内容3.2 下载依赖3.3 启动webpack 4.基本配置4.1 五大核心概念4.2 基本使用 1.基本介绍 Webpack 是一个静态资源打包工具。它会以一个或多个文件作为打包的入口, 将我们整个项目所有文件编译组合成一个或多个文件输出出去…

webpack基础知识八:说说如何借助webpack来优化前端性能?

一、背景 随着前端的项目逐渐扩大&#xff0c;必然会带来的一个问题就是性能 尤其在大型复杂的项目中&#xff0c;前端业务可能因为一个小小的数据依赖&#xff0c;导致整个页面卡顿甚至奔溃 一般项目在完成后&#xff0c;会通过webpack进行打包&#xff0c;利用webpack对前…

医疗器械研发中的可用性工程实践(一)

致读者&#xff1a;以前看《楚门的世界》&#xff0c;《蝴蝶效应》&#xff0c;《肖申克的救赎》&#xff0c;《教父》&#xff0c;《横道世之介》&#xff0c;《老友记》&#xff0c;一个人的一生匆匆。作为平凡人就是历史大河中的浪花&#xff0c;顺势而为&#xff0c;起起伏…

算法与数据结构(二十一)二叉树(纲领篇)

备注&#xff1a;本文旨在通过 labuladong 的二叉树&#xff08;纲领篇&#xff09;理解框架思维&#xff0c;用于个人笔记及交流学习&#xff0c;版权归原作者 labuladong 所有&#xff1b; 我刷了这么多年题&#xff0c;浓缩出二叉树算法的一个总纲放在这里&#xff0c;也许…

ELK企业级日志分析系统

目录 一、ELK 概述 1.ElasticSearch 2.Kiabana 3.Logstash 可以添加的其它组件 1.Filebeat 2.Fluentd 三、为什么要使用 ELK 四、ELK 的工作原理 五、 ELK Elasticsearch 集群部署 更改主机名、配置域名解析、查看Java环境 部署 Elasticsearch 软件 修改elasticsearc…

爬虫获取电影数据----以沈腾参演电影为例

数据可视化&分析实战 1.1 沈腾参演电影数据获取 文章目录 数据可视化&分析实战前言1. 网页分析2. 构建数据获取函数2.1 网页数据获取函数2.2 网页照片获取函数 3. 获取参演影视作品基本数据4. 电影详细数据获取4.1 导演、演员、描述、类型、投票人数、评分信息、电影海…

Wisej.NET Crack,Wisej.NET的核心功能

Wisej.NET Crack&#xff0c;Wisej.NET的核心功能 Wisej.NET是一个跨平台的web框架&#xff0c;用于使用.NET和C#/VB.NET而不是HTML和JavaScript构建现代HTML5应用程序。它包含创建任务关键型web应用程序所需的一切&#xff0c;包括UI组件、会话处理、状态管理和后端集成。借助…

单元测试之 - Spring框架提供的单元/集成测试注解

Spring框架提供了很多注解来辅助完成单元测试和集成测试(备注&#xff1a;这里的集成测试指容器内部的集成测试&#xff0c;非系统间的集成测试)&#xff0c;先看看Spring框架提供了哪些注解以及对应的作用。RunWith(SpringRunner.class) / ExtendWith(SpringExtension.class)&…

设计模式行为型——备忘录模式

目录 什么是备忘录模式 备忘录模式的实现 备忘录模式角色 备忘录模式类图 备忘录模式举例 备忘录模式代码实现 备忘录模式的特点 优点 缺点 使用场景 注意事项 实际应用 什么是备忘录模式 备忘录模式&#xff08;Memento Pattern&#xff09;又叫做快照模式&#x…

高并发负载均衡---LVS

目录 前言 一&#xff1a;负载均衡概述 二&#xff1a;为啥负载均衡服务器这么快呢&#xff1f; ​编辑 2.1 七层应用程序慢的原因 2.2 四层负载均衡器LVS快的原因 三&#xff1a;LVS负载均衡器的三种模式 3.1 NAT模式 3.1.1 什么是NAT模式 3.1.2 NAT模式实现LVS的缺点…

openwr折腾记7-Frpc使用自主域名解析透传本地服务免费不断线的探索

Frpc使用自主域名解析透传本地服务 综述frp透传http服务结构流程 第一部分openwrt-frpc客户端配置和使用指定服务器指定规则在自己的域名运营商处添加域名解析 第二部分shell编码实现frp自由切换服务器并更新dns解析获取切换服务器参数脚本实现切换脚本更新DNS解析打开openwrt计…

MySQL — InnoDB事务

文章目录 事务定义事务特性事务隔离级别READ UNCOMMITTEDREPEATABLE READREAD COMMITTEDSERIALIZABLE 事务存在的问题脏读&#xff08;Dirty Read&#xff09;不可重复读&#xff08;Non-repeatable Read&#xff09;幻读&#xff08;Phantom Read&#xff09; 事务定义 数据库…

(十三)大数据实战——hadoop集群之YARN高可用实现自动故障转移

前言 本节内容是关于hadoop集群下yarn服务的高可用搭建&#xff0c;以及其发生故障转移的处理&#xff0c;同样需要依赖zookeeper集群的实现&#xff0c;实现该集群搭建时&#xff0c;我们要预先保证zookeeper集群是启动状态。yarn的高可用同样依赖zookeeper的临时节点及监控&…