目录
网址:
一、项目准备
1.导入依赖
2.抽取工具类
配置的属性在哪里呢
二、代码编写
1.简单模式
生产者
消费者
2.Work queues工作队列模式
生产者
消费者1
消费者2
3.Publish/Subscribe发布与订阅模式
生产者
消费者1
消费者2
4.Routing路由模式
生产者
消费者1
消费者2
5.Topics通配符模式
生产者
消费者1
消费者2
三、总结
网址:
RabbitMQ官方地址:RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQhttp://www.rabbitmq.com/
官网对应模式介绍:RabbitMQ Tutorials — RabbitMQhttps://www.rabbitmq.com/getstarted.html
一、项目准备
1.导入依赖
在maven工程的pom文件中添加依赖。
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency>
2.抽取工具类
不论是生产者还是消费者都要创建连接工厂和创建连接这连个步骤。
//1. 创建连接工厂(设置RabbitMQ的连接参数);ConnectionFactory connectionFactory = new ConnectionFactory();//主机;默认localhostconnectionFactory.setHost("localhost");//连接端口;默认5672connectionFactory.setPort(5672);//虚拟主机;默认/connectionFactory.setVirtualHost("/yh");//用户名;默认guestconnectionFactory.setUsername("yh");//密码;默认guestconnectionFactory.setPassword("yh");//2. 创建连接;Connection connection = connectionFactory.newConnection();
为简省代码可抽取一个工具类RabbitGetConnection.java。
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class RabbitGetConnection {public static Connection getConnection(){//1. 创建连接工厂(设置RabbitMQ的连接参数);ConnectionFactory connectionFactory = new ConnectionFactory();//主机;默认localhostconnectionFactory.setHost("localhost");//连接端口;默认5672connectionFactory.setPort(5672);//虚拟主机;默认/connectionFactory.setVirtualHost("/yh");//用户名;默认guestconnectionFactory.setUsername("admin");//密码;默认guestconnectionFactory.setPassword("admin");//2. 创建连接;Connection connection = null;try {connection = connectionFactory.newConnection();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {throw new RuntimeException(e);}return connection;}
}
配置的属性在哪里呢
摁住Ctrl键单击newConnection()方法查看源码
二、代码编写
1.简单模式
为了更加直观,前面没有使用抽取出来的工具类。
生产者
package cn.yh.rabbitmq.simple;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** 简单模式:发送消息*/
public class Producer {static final String QUEUE_NAME = "simple_queue";public static void main(String[] args) throws Exception {//1. 创建连接工厂(设置RabbitMQ的连接参数);ConnectionFactory connectionFactory = new ConnectionFactory();//主机;默认localhostconnectionFactory.setHost("localhost");//连接端口;默认5672connectionFactory.setPort(5672);//虚拟主机;默认/connectionFactory.setVirtualHost("/yh");//用户名;默认guestconnectionFactory.setUsername("yh");//密码;默认guestconnectionFactory.setPassword("yh");//2. 创建连接;Connection connection = connectionFactory.newConnection();//3. 创建频道;Channel channel = connection.createChannel();//4. 声明队列;/*** 参数1:队列名称* 参数2:是否定义持久化队列(消息会持久化保存在服务器上)* 参数3:是否独占本连接* 参数4:是否在不使用的时候队列自动删除* 参数5:其它参数*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);//5. 发送消息;String message = "你好!小兔纸。";/*** 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机)* 参数2:路由key,简单模式中可以使用队列名称* 参数3:消息其它属性* 参数4:消息内容*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("已发送消息:" + message);//6. 关闭资源channel.close();connection.close();}
}
消费者
package cn.yh.rabbitmq.simple;import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;/*** 简单模式;消费者接收消息*/
public class Consumer {public static void main(String[] args) throws Exception {//1. 创建连接工厂;//2. 创建连接;(抽取一个获取连接的工具类)Connection connection = ConnectionUtil.getConnection();//3. 创建频道;Channel channel = connection.createChannel();//4. 声明队列;/*** 参数1:队列名称* 参数2:是否定义持久化队列(消息会持久化保存在服务器上)* 参数3:是否独占本连接* 参数4:是否在不使用的时候队列自动删除* 参数5:其它参数*/channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);//5. 创建消费者(接收消息并处理消息);DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key为:" + envelope.getRoutingKey());//交换机System.out.println("交换机为:" + envelope.getExchange());//消息idSystem.out.println("消息id为:" + envelope.getDeliveryTag());//接收到的消息System.out.println("接收到的消息为:" + new String(body, "utf-8"));}};//6. 监听队列/*** 参数1:队列名* 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除;* 如果设置为false则需要手动确认* 参数3:消费者*/channel.basicConsume(Producer.QUEUE_NAME, true, defaultConsumer);}
}
2.Work queues工作队列模式
生产者
package cn.yh.rabbitmq.work;
import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);for (int i = 1; i <= 30; i++) {// 发送信息String message = "你好;小兔子!work模式--" + i;/*** 参数1:交换机名称,如果没有指定则使用默认Default Exchage* 参数2:路由key,简单模式可以传递队列名称* 参数3:消息其它属性* 参数4:消息内容*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("已发送消息:" + message);}// 关闭资源channel.close();connection.close();}
}
消费者1
package cn.yh.rabbitmq.work;
import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);//一次只能接收并处理一个消息channel.basicQos(1);//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {try {//路由keySystem.out.println("路由key为:" + envelope.getRoutingKey());//交换机System.out.println("交换机为:" + envelope.getExchange());//消息idSystem.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-
8"));Thread.sleep(1000);//确认消息channel.basicAck(envelope.getDeliveryTag(), false);} catch (InterruptedException e) {e.printStackTrace();}}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(Producer.QUEUE_NAME, false, consumer);}
}
消费者2
package cn.yh.rabbitmq.work;
import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);//一次只能接收并处理一个消息channel.basicQos(1);//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {try {//路由keySystem.out.println("路由key为:" + envelope.getRoutingKey());//交换机System.out.println("交换机为:" + envelope.getExchange());//消息idSystem.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));Thread.sleep(1000);//确认消息channel.basicAck(envelope.getDeliveryTag(), false);} catch (InterruptedException e) {e.printStackTrace();}}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(Producer.QUEUE_NAME, false, consumer);}
}
3.Publish/Subscribe发布与订阅模式
发布与订阅模式:一个消息可以被多个消费者接收;一个消费者对于的队列,该队列只能被一个消费者监听。使用了订阅模式中交换机类型为:广播。
生产者
package cn.yh.rabbitmq.ps;
import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;/*** 发布与订阅使用的交换机类型为:fanout*/
public class Producer {//交换机名称static final String FANOUT_EXCHAGE = "fanout_exchange";//队列名称static final String FANOUT_QUEUE_1 = "fanout_queue_1";//队列名称static final String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();/*** 声明交换机* 参数1:交换机名称* 参数2:交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null); //队列绑定交换机channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHAGE, "");channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHAGE, "");for (int i = 1; i <= 10; i++) {// 发送信息String message = "你好;小兔子!发布订阅模式--" + i;/*** 参数1:交换机名称,如果没有指定则使用默认Default Exchage* 参数2:路由key,简单模式可以传递队列名称* 参数3:消息其它属性* 参数4:消息内容*/channel.basicPublish(FANOUT_EXCHAGE, "", null, message.getBytes());System.out.println("已发送消息:" + message);}// 关闭资源channel.close();connection.close();}
}
消费者1
package cn.yh.rabbitmq.ps;
import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数 */channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);//队列绑定交换机channel.queueBind(Producer.FANOUT_QUEUE_1, Producer.FANOUT_EXCHAGE, "");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key为:" + envelope.getRoutingKey());//交换机System.out.println("交换机为:" + envelope.getExchange());//消息idSystem.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-
8"));}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer);}
}
消费者2
package cn.yh.rabbitmq.ps;
import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);//队列绑定交换机channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHAGE, "");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key为:" + envelope.getRoutingKey());//交换机System.out.println("交换机为:" + envelope.getExchange());//消息idSystem.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(Producer.FANOUT_QUEUE_2, true, consumer);}
}
4.Routing路由模式
Routing 路由模式要求队列绑定到交换机的时候指定路由key;消费发送时候需要携带路由key;只有消息的路由key与队列路由key完全一致才能让该队列接收到消息。
生产者
import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;/*** 路由模式的交换机类型为:direct*/
public class Producer {//交换机名称static final String DIRECT_EXCHAGE = "direct_exchange";//队列名称static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";//队列名称static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();/*** 声明交换机* 参数1:交换机名称* 参数2:交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);//队列绑定交换机channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, "insert");channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE, "update");// 发送信息String message = "新增了商品。路由模式;routing key 为 insert " ;/*** 参数1:交换机名称,如果没有指定则使用默认Default Exchage* 参数2:路由key,简单模式可以传递队列名称* 参数3:消息其它属性* 参数4:消息内容*/channel.basicPublish(DIRECT_EXCHAGE, "insert", null, message.getBytes());System.out.println("已发送消息:" + message);// 发送信息message = "修改了商品。路由模式;routing key 为 update" ;/*** 参数1:交换机名称,如果没有指定则使用默认Default Exchage* 参数2:路由key,简单模式可以传递队列名称* 参数3:消息其它属性* 参数4:消息内容*/channel.basicPublish(DIRECT_EXCHAGE, "update", null, message.getBytes());System.out.println("已发送消息:" + message);// 关闭资源channel.close();connection.close();}
}
消费者1
package cn.yh.rabbitmq.routing;import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(Producer.DIRECT_QUEUE_INSERT, true, false, false, null);//队列绑定交换机channel.queueBind(Producer.DIRECT_QUEUE_INSERT, Producer.DIRECT_EXCHAGE, "insert");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key为:" + envelope.getRoutingKey());//交换机System.out.println("交换机为:" + envelope.getExchange());//消息idSystem.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(Producer.DIRECT_QUEUE_INSERT, true, consumer);}
}
消费者2
package cn.yh.rabbitmq.routing;import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(Producer.DIRECT_QUEUE_UPDATE, true, false, false, null);//队列绑定交换机channel.queueBind(Producer.DIRECT_QUEUE_UPDATE, Producer.DIRECT_EXCHAGE, "update");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key为:" + envelope.getRoutingKey());//交换机System.out.println("交换机为:" + envelope.getExchange());//消息idSystem.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(Producer.DIRECT_QUEUE_UPDATE, true, consumer);}
}
5.Topics通配符模式
Topic主题模式可以实现 Publish/Subscribe发布与订阅模式 和 Routing路由模式 的功能;只是Topic在配置routingkey 的时候可以使用通配符,显得更加灵活。
Topics通配符模式:可以根据路由key将消息传递到对应路由key的队列;队列绑定到交换机的路由key可以有多个;通配符模式中路由key可以使用 *
和 #
;使用了通配符模式之后对于路由Key的配置更加灵活。
生产者
package cn.yh.rabbitmq.topic;
import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/*** 通配符Topic的交换机类型为:topic*/
public class Producer {//交换机名称static final String TOPIC_EXCHAGE = "topic_exchange";//队列名称static final String TOPIC_QUEUE_1 = "topic_queue_1";//队列名称static final String TOPIC_QUEUE_2 = "topic_queue_2";public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();/*** 声明交换机* 参数1:交换机名称* 参数2:交换机类型,fanout、topic、topic、headers*/channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);// 发送信息String message = "新增了商品。Topic模式;routing key 为 item.insert " ;channel.basicPublish(TOPIC_EXCHAGE, "item.insert", null, message.getBytes());System.out.println("已发送消息:" + message);// 发送信息message = "修改了商品。Topic模式;routing key 为 item.update" ;channel.basicPublish(TOPIC_EXCHAGE, "item.update", null, message.getBytes());System.out.println("已发送消息:" + message);// 发送信息message = "删除了商品。Topic模式;routing key 为 item.delete" ;channel.basicPublish(TOPIC_EXCHAGE, "item.delete", null, message.getBytes());System.out.println("已发送消息:" + message);// 关闭资源channel.close();connection.close();}
}
消费者1
package cn.yh.rabbitmq.topic;
import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(Producer.TOPIC_QUEUE_1, true, false, false, null);//队列绑定交换机channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE,
"item.update");channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE,
"item.delete");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key为:" + envelope.getRoutingKey());//交换机System.out.println("交换机为:" + envelope.getExchange());//消息idSystem.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(Producer.TOPIC_QUEUE_1, true, consumer);}
}
消费者2
package cn.yh.rabbitmq.topic;
import cn.yh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(Producer.TOPIC_QUEUE_2, true, false, false, null);//队列绑定交换机channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHAGE, "item.*");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key为:" + envelope.getRoutingKey());//交换机System.out.println("交换机为:" + envelope.getExchange());//消息idSystem.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消
息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(Producer.TOPIC_QUEUE_2, true, consumer);}
}
三、总结
目标:对比总结RabbitMQ的5种模式特征
-
不直接Exchange交换机(默认交换机)
-
simple简单模式: 一个生产者、一个消费者,生产者生产消息到一个队列被一个消费者接收
-
work Queue工作队列模式: 一个生产者、多个消费者(竞争关系),生产者发送消息到一个队列中,可以被多个消费者监听该队列;一个消息只能被一个消费者接收,消费者之间是竞争关系
-
-
使用Exchange交换机;订阅模式(交换机:广播fanout、定向direct、通配符topic)
-
发布与订阅模式:使用了fanout广播类型的交换机,可以将一个消息发送到所有绑定了该交换机的队列
-
路由模式:使用了direct定向类型的交换机,消息会携带路由key,交换机根据消息的路由key与队列的路由key进行对比,一致的话那么该队列可以接收到消息
-
通配符模式:使用了topic通配符类型的交换机,消息会携带路由key,交换机根据消息的路由key与队列的路由key(*, #)进行对比,匹配的话那么该队列可以接收到消息
-