Work Queues
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。
相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
代码实现
抽取工具类
public class RabbitMqUtils {//得到一个连接的 channelpublic static Channel getChannel() throws Exception{//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("182.92.234.71");factory.setUsername("admin");factory.setPassword("123");Connection connection = factory.newConnection();Channel channel = connection.createChannel();return channel;} }
启动两个工作线程
package com.atguigu.two;import com.atguigu.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;/**** 这是一个工作线程(相当于消费者)*/
public class Worker01 {//队列的名称public static final String QUEUE_NAME = "hello";//接收消息public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//消息的接收DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println("接收到的消息:"+ new String (message.getBody()));};//消息接收被取消时,执行下面的内容CancelCallback cancelCallback = consumerTag->{System.out.println(consumerTag+ "消息被消费者取消消费接口回调逻辑");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true 代表的自动应答 false 代表手动应答* 3.当消息传达到后(成功之后)的回调* 4.消费者取消消费的回调*/System.out.println("C1等待接收消息......");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}}
生产者代码
package com.atguigu.two;import com.atguigu.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;import java.util.Scanner;/*** 生产者 发送大量的消息**/
public class Task01 {//队列名称public static final String QUEUE_NAME = "hello";//发送大量消息public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//队列的声明/*** 生成一个队列* 1.队列名称* 2.队列里面的消息是否持久化(磁盘) 默认情况消息存储在内存中* 3.该队列是否只供一个消费者进行消费 是否进行消费共享,false可以多个消费者消费 true:只能一个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true自动删除 false不自动删除* 5.其他参数*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);//从控制台当中接受信息Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){String message = scanner.next();/**** 发送一个消息* 1.发送到哪个交换机* 2.路由的Key值是哪个 本次是队列的名称* 3.其他参数信息* 4.发送消息的消息体*/channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("发送消息完成:"+message);}}}
测试