Work消息模型
* work模型:
* 多个消费者消费同一个队列中的消息,每个消费者获取到的消息唯一,且只能消费一次
* 作用:提高消息的消费速度,避免消息的堆积
* 默认采用轮询的方式分发消息
* 如果某个消费者处理消息慢,会导致消息堆积
生产者
package com.example.demo02.mq.work;import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.util.concurrent.TimeUnit;/*** @author Allen* 4/10/2024 9:37 PM* @version 1.0* @description: work模式发送者** work模型:* 多个消费者消费同一个队列中的消息,每个消费者获取到的消息唯一,且只能消费一次* 作用:提高消息的消费速度,避免消息的堆积* 默认采用轮询的方式分发消息* 如果某个消费者处理消息慢,会导致消息堆积*/
public class WorkSender {public static void main(String[] args) throws Exception {
// 1:获取连接Connection connection = ConnectionUtils.getConnection();
// 2:创建通道Channel channel = connection.createChannel();
// 3:声明队列// 参数1:队列名称 参数2:是否持久化 参数3:是否排他性 参数4:是否自动删除 参数5:队列的属性channel.queueDeclare("work.queue", false, false, false, null);
// 4:发送100条消息
for (int i = 0; i < 100; i++) {String msg = "work模式消息" + i;//休眠i*5毫秒TimeUnit.MILLISECONDS.sleep(i * 5);// 参数1:交换机名称 参数2:队列名称 参数3:消息的其他属性 参数4:消息的内容channel.basicPublish("", "work.queue", null, msg.getBytes());System.out.println("work模式发送消息:" + msg);}
// 5:关闭通道channel.close();
// 6:关闭连接connection.close();}
}
消费者1
(能者多劳角色)
package com.example.demo02.mq.work;import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;/*** @author Allen* 4/10/2024 9:37 PM* @version 1.0* @description: work模式消费者1号*/
public class WorkReciver1 {public static void main(String[] args) throws Exception {// 1:获取连接Connection connection = ConnectionUtils.getConnection();// 2:创建通道Channel channel = connection.createChannel();// 3:声明队列// 参数1:队列名称 参数2:是否持久化 参数3:是否排他性 参数4:是否自动删除 参数5:队列的属性channel.queueDeclare("work.queue", false, false, false, null);// 4:定义消费者,消费消息// 参数1:队列名称 参数2:是否自动确认消息 参数3:消费者对象Consumer consumer = new DefaultConsumer(channel) {// 消费者接收消息调用此方法// 参数1:消费者标签 参数2:队列参数 参数3:消息属性 参数4:消息内容@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 获取消息String msg = new String(body);System.out.println("work模式消费者1号接收消息:" + msg);channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume("work.queue", false, consumer);}
}
消费者2
(消费能力差)
package com.example.demo02.mq.work;import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeUnit;/*** @author Allen* 4/10/2024 9:37 PM* @version 1.0* @description: work模式消费者1号*/
public class WorkReciver2 {public static void main(String[] args) throws Exception {// 1:获取连接Connection connection = ConnectionUtils.getConnection();// 2:创建通道Channel channel = connection.createChannel();// 3:声明队列// 参数1:队列名称 参数2:是否持久化 参数3:是否排他性 参数4:是否自动删除 参数5:队列的属性channel.queueDeclare("work.queue", false, false, false, null);//如果此消费者性能较差,配置能者多劳:指定一次获取几条信息,消息消费成功后 ack之后 mq才会发送下一条消息channel.basicQos(1);// 4:定义消费者,消费消息// 参数1:队列名称 参数2:是否自动确认消息 参数3:消费者对象Consumer consumer = new DefaultConsumer(channel) {// 消费者接收消息调用此方法// 参数1:消费者标签 参数2:队列参数 参数3:消息属性 参数4:消息内容@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//模拟二号消费者处理消息慢try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}// 获取消息:执行业务String msg = new String(body);System.out.println("work模式消费者2号接收消息:" + msg);channel.basicAck(envelope.getDeliveryTag(), false);}};// 参数1:队列名称 参数2:ACK是否自动确认 参数3:消费者对象//必须手动确认消息,否则会报406错误channel.basicConsume("work.queue", false, consumer);}
}
结果:
能者多劳