一、简单模式
简介
简单模式 HelloWorld。一个生产者、一个消费者,不需要设置交换机使用默认的交换机。
代码示例
- 生产者
public class Producer {//队列名称private final static String QUEUE_NAME = "hello";public static void main(String[] args) {//建立连接工厂ConnectionFactory factory = new ConnectionFactory();//设置目标主机ipfactory.setHost("192.168.47.128");//设置账号名密码factory.setUsername("yf");factory.setPassword("123456"); // 修改端口的设置 // factory.setPort();try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {//通道和队列的连接/*queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)参数说明:queue:队列名称durable:是否持久化exclusive:是否独占,是否一个消费者监听一个队列autoDelete:是否自动删除。如果没有消费者consumer,自动删除掉队列arguments:参数*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);for (int i = 0; i < 10; i++) {//需要发送的消息String message = "Hello RabbitMQ!"+i;//通过最基础的发布/*basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)参数说明:exchange:指定交换机,如果使用默认模式,就使用“”routingKey:路由名称props:配置信息body:发送的消息(要求字节数组)*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}} catch (Exception e) {e.printStackTrace();}} }
- 消费者
/*** 服务端,接收信息*/ public class Consumer {//指定接收队列名称private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();//设置目标主机ipfactory.setHost("192.168.47.128");//设置用户密码factory.setUsername("yf");factory.setPassword("123456");//建立连接Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {String msg = new String(message.getBody(), "UTF-8");System.out.println(" [x] Received '" + msg + "'");}};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});} }
二、工作队列模式
简介
- 工作队列与简单模式相比,一个生产者、多个消费者(排它关系),多个消费端共同消费同一个队列中的消息
- 使用场景:对于消息生产速度大于消费速度场景,可以增加消费者减少单个消费者压力
代码示例
- 生产者
public class Producer {//队列名称private final static String QUEUE_NAME = "work_queues";public static void main(String[] args) {//建立连接工厂ConnectionFactory factory = new ConnectionFactory();//设置目标主机ipfactory.setHost("192.168.47.128");//设置账号名密码factory.setUsername("yf");factory.setPassword("123456"); // 修改端口的设置 // factory.setPort();try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {//通道和队列的连接/*queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)参数说明:queue:队列名称durable:是否持久化exclusive:是否独占,是否一个消费者监听一个队列autoDelete:是否自动删除。如果没有消费者consumer,自动删除掉队列arguments:参数*/ // channel.basicQos(1);// 如果你的消息还没有确认,那么我同一时间只能给你发送一条消息channel.queueDeclare(QUEUE_NAME, false, false, false, null);for (int i = 0; i < 10; i++) {//需要发送的消息String message = "Hello RabbitMQ!"+i;//通过最基础的发布/*basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)参数说明:exchange:指定交换机,如果使用默认模式,就使用“”routingKey:路由名称props:配置信息body:发送的消息(要求字节数组)*/channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}} catch (Exception e) {e.printStackTrace();}} }
- 消费者
/*** 服务端,接收信息*/ public class Consumer1 {//指定接收队列名称private final static String QUEUE_NAME = "work_queues";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();//设置目标主机ipfactory.setHost("192.168.47.128");//设置用户密码factory.setUsername("yf");factory.setPassword("123456");//建立连接Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.basicQos(1); // 如果你的消息还没有确认,那么我同一时间只能给你发送一条消息channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {String msg = new String(message.getBody(), "UTF-8");System.out.println(" [x] Received '" + msg + "'");}};/* 这种写法和上面是一样的 使用的是lambda表达式DeliverCallback deliverCallback = (consumerTag, message) -> {String msg = new String(message.getBody(), "UTF-8");System.out.println(" [x] Received '" + msg + "'");};*/channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});} }
小结
- 分发机制:轮询分发机制
- 也就是说当生产者生产了10条消息,2个消费者分别消费5条消息。
- 应用场景:同一条消息,在多个消费者之间只能有一个消费,应用于只需要单节点消费的场景
- 发送验证码
- 发送生日提醒
三、发布订阅模式(Publish/Subscribe)
简介
- 在订阅模型中,多了一个Exchange 角色:
- Exchange:交换机(X)。接收生产者发送的消息; 处理投递消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。具体操作根据交换机类型来定义:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
代码示例
- 生产者
- 消费者
该文章还没写完,先发布出来,后面会持续更新!!!