前置文章
消息队列——RabbitMQ基本概念+容器化部署和简单工作模式程序_北岭山脚鼠鼠的博客-CSDN博客
消息队列——rabbitmq的不同工作模式_北岭山脚鼠鼠的博客-CSDN博客
消息队列——spring和springboot整合rabbitmq_北岭山脚鼠鼠的博客-CSDN博客
目录
Work queues 工作队列模式
案例:
在生产者端
在消费者端
结果如下
消费预取限制
发布订阅模型
Fanout Exchange(配置文件实现)
案例
消费者代码
生产者代码
Direct Exchange (注解实现)
案例
消费者代码
生产者代码
Topic Exchange
案例
消费者代码
生产者代码
消息转换器
生产者代码
JSON方式序列化
生产者代码 (jackson)
消费者代码(jackson)
总结
Work queues 工作队列模式
这里用的不是上面第三篇文章里面的定义配置类的形式。
案例:
在生产者端
队列要存在才可以上传。不然代码运行不会报错,但是消息也会不知道发到哪里去。
@Testpublic void testSendMessage2() throws InterruptedException {String queue_Name= "simple.queue";String message="hello 鼠鼠";for(int i=1;i<=50;i++)rabbitTemplate.convertAndSend(queue_Name,message+i);Thread.sleep(20);}
在消费者端
定义了两个消费者监听上面的队列,本来想三个的,但是不知道默认的交换机名字,所以弄了两个。并且根据注解的不同,第一个是可以直接创建一个队列,第二个需要队列已存在才行。
@Component
public class RabbitMQListener {//自动创建队列@RabbitListener(queuesToDeclare=@Queue("simple.queue"))public void ListenerWorkQueue1(Message message) throws InterruptedException {System.out.println("11111"+message.getBody());Thread.sleep(20);}//需要在rabbit_mq上手动创建队列,不然会报错@RabbitListener(queues="simple.queue")public void ListenerWorkQueue2(Message message) throws InterruptedException {System.out.println("22222"+message.getBody());Thread.sleep(200);}//3. 自动创建队列,Exchange 与 Queue绑定
// @RabbitListener(bindings = @QueueBinding(
// value = @Queue("simple.queue"),
// exchange = @Exchange("/") //绑定默认交换机
// ))
// public void ListenerWorkQueue3(Message message) throws InterruptedException {
// System.out.println("33333"+message.getBody());
// Thread.sleep(200);
// }
}
结果如下
两个队列轮流取消息导致反而变慢了。
消费预取限制
要指定队列才有效果。
这里就相当于指定了在simple前缀的队列上每次只能获取一条消息。
运行结果如下,大多数都交给了快的队列执行。
发布订阅模型
Fanout Exchange(配置文件实现)
消息路由到每个绑定的消息队列。
案例
消费者代码
spring读取到这个Bean之后就会向RabbitMq发请求,创建交换机,绑定队列了。
@Configuration
public class FanoutConfig {//itcast.fanout@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}//fanout.queue1@Beanpublic Queue fanoutQueue1(){return new Queue("fannout.queue1");}//绑定队列1到交换机@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}//fanout.queue1@Beanpublic Queue fanoutQueue2(){return new Queue("fannout.queue2");}//绑定队列2到交换机@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
定义两个监听用的方法
@Component
public class RabbitMQListener {
// @RabbitListener(queues="boot_queue")
// public void ListenerQueue(Message message){
// System.out.println(message);
// }//自动创建队列// @RabbitListener(queuesToDeclare=@Queue("simple.queue"))
// public void ListenerWorkQueue1(Message message) throws InterruptedException {
// System.out.println("11111"+message.getBody()+ LocalDateTime.now());
// Thread.sleep(20);
// }
//
// //需要在rabbit_mq上手动创建队列,不然会报错
// @RabbitListener(queues="simple.queue")
// public void ListenerWorkQueue2(Message message) throws InterruptedException {
// System.out.println("22222"+message.getBody()+ LocalDateTime.now());
// Thread.sleep(200);
// }//3. 自动创建队列,Exchange 与 Queue绑定
// @RabbitListener(bindings = @QueueBinding(
// value = @Queue("simple.queue"),
// exchange = @Exchange("/") //绑定默认交换机
// ))
// public void ListenerWorkQueue3(Message message) throws InterruptedException {
// System.out.println("33333"+message.getBody());
// Thread.sleep(200);
// }@RabbitListener(queuesToDeclare=@Queue("fanout.queue1"))public void ListenerFanoutQueue1(Message message) throws InterruptedException {System.out.println("11111"+message.getBody());}@RabbitListener(queuesToDeclare=@Queue("fanout.queue2"))public void ListenerFanoutQueue2(Message message) throws InterruptedException {System.out.println("22222"+message.getBody());}
}
生产者代码
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {//1.注入RabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate;// @Test
// public void testSend(){
// rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,"boot.haha","hello 鼠鼠");
// }// @Test
// public void testSendMessage2() throws InterruptedException {
// String queue_Name= "simple.queue";
// String message="hello 鼠鼠";
//
// for(int i=1;i<=50;i++)
// rabbitTemplate.convertAndSend(queue_Name,message+i);
// Thread.sleep(20);
// }@Testpublic void testSendFanoutExchange(){//交换机名称String exchangeName="itcast.fanout";//消息String message="hello 鼠鼠";//发送消息rabbitTemplate.convertAndSend(exchangeName,"",message);}
}
Direct Exchange (注解实现)
案例
消费者代码
@Component
public class RabbitMQListener { @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name="itcast.direct" , type= ExchangeTypes.DIRECT),key={"red","blue"}))public void listenDirectQueue1(String msg){System.out.println("消费者接收到:"+msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name="itcast.direct" , type= ExchangeTypes.DIRECT),key={"red","yellow"}))public void listenDirectQueue2(String msg){System.out.println("消费者接收到:"+msg);}
}
生产者代码
@Testpublic void testSendDirectExchange(){//交换机名称String exchangeName="itcast.direct";//消息String message="hello 鼠鼠";//发送消息rabbitTemplate.convertAndSend(exchangeName,"blue",message);}
此条代码只有绑定了blue这个key的队列才可以收到。
换成red就是两个队列都可以收到了。
Topic Exchange
案例
消费者代码
@RabbitListener(bindings = @QueueBinding(value=@Queue(name="topic.queue1"),exchange=@Exchange(name="itcast.topic",type = ExchangeTypes.TOPIC),key="japan.#"))public void listenTopicQueue1(String msg){System.out.println("消费者接收到:"+msg);}@RabbitListener(bindings = @QueueBinding(value=@Queue(name="topic.queue2"),exchange=@Exchange(name="itcast.topic",type = ExchangeTypes.TOPIC),key="#.news"))public void listenTopicQueue2(String msg){System.out.println("消费者接收到:"+msg);}
生产者代码
@Testpublic void testSendTopicExchange(){//交换机名称String exchangeName="itcast.topic";//消息String message="北岭山脚鼠鼠横死街头,究竟是人性的沦丧还是道德的....";//发送消息rabbitTemplate.convertAndSend(exchangeName,"japan.news",message);}
两个都符合,所以都能收到。
消息转换器
定义一个队列
@Beanpublic Queue objectQueue(){return new Queue("object.queue");}
生产者代码
@Testpublic void testSendObjectQueue(){//消息Map<String,Object> msg=new HashMap<>();msg.put("name","北岭山脚鼠鼠");msg.put("age","22");//发送消息rabbitTemplate.convertAndSend("object.queue",msg);}
可以看见消息被转换成了一长串字符,content_type写着java的序列化。
效率差,安全性也差。
JSON方式序列化
声明好MessageConveter之后就可以自动覆盖默认序列化方式了。
导入一个核心依赖
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
生产者代码 (jackson)
修改生产者的启动类代码,加上一个Bean
@SpringBootApplication
public class ProducerApplication {public static void main(String[] args) {SpringApplication.run(ProducerApplication.class);}@Beanpublic Jackson2JsonMessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}
启动测试类之后可以看见新的消息出现了。
消费者代码(jackson)
然后可以正常接受到消息
如果消费者不使用对应jackson解析的话,代码会报错
总结
推荐使用jackson的方式