RabbitMQ入门笔记

一、写在前面

  • 什么是消息队列?

顾名思义,消息队列就是一个能够存放消息的队列,通常有一个生产者生产消息,一个或多个消费者消费消息。

消息队列在分布式系统中运用十分广泛,有异步处理、应用解耦、流量削峰等用途。

当然,RabbitMQ不是消息队列的唯一选择,除它以外还有在大数据中十分常见的Kafka、阿里的RocketMQ、Apache的ActiveMQ,甚至Redis也可以当成消息队列使用。

不过,正常情况下RabbitMQ是比较通用的选择。

  • RabbitMQ基础模型

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DPuLCnGj-1693195054817)(http://www.siyemao.xyz/images/1640611813402.png)]

上面是rabbitmq的基础模型,可以看到Server包裹着虚拟主机,一般来说,虚拟主机和用户相对应,一个Rabbitmq可以有多个虚拟主机、多个用户,以对应不同的系统不同的功能模块。

此外,虚拟主机还包含着交换机和数个队列,生产者会将消息交给交换机,然后队列通过某种规则从交换机中取得消息,再由消费者拿到消息。当然,生产者是否要将消息交给交换机,取决于使用了哪种消息模型。

二、消息模型

  • 1.HelloWorld(直连)模型。

    一个生产者生产消息交给队列,消费者通过队列取得消息,非常简单的一对一消息模型。

    首先编写一个获取RabbitMQ工厂对象的工具类:

    package utils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    public class RabbitMQUtil {private static ConnectionFactory connectionFactory;static {connectionFactory = new ConnectionFactory();connectionFactory.setHost("www.siyemao.xyz");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/mao");connectionFactory.setUsername("siye");connectionFactory.setPassword("siye");}//定义提供连接对象的方法public static Connection getConnection() {try {return connectionFactory.newConnection();} catch (Exception e) {e.printStackTrace();}return null;}//定义关闭通道和关闭连接工具的方法public static void closeConnectionAndChanel(Channel channel,Connection connection){try{if (channel != null) channel.close();if (connection != null) connection.close();}catch (Exception e){e.printStackTrace();}}
    }
    

    接着再创建生产者类和消费者类:

    public class Provider {public static void main(String args[]) throws IOException, TimeoutException {Connection connection = RabbitMQUtil.getConnection();//通过连接获取通道Channel channel = connection.createChannel();//通道绑定对应消息队列//参数1:队列名称,如果队列不存在自动创建//参数2:队列是否持久化,true持久化队列,false不持久化//参数3:是否独占队列,true独占队列,false不独占//参数4:是否再消费完成后自动删除队列//参数5:额外附加参数channel.queueDeclare("hello",true,false,false,null);//发布消息//参数1:交换机名称,参数2:队列名称,参数3:传递消息额外设置(如果想要消息持久化,则使用MessageProperties.PERSISTENT_TEXT_PLAIN),参数4:消息具体类容channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello world".getBytes());RabbitMQUtil.closeConnectionAndChanel(channel,connection);}
    }
    
    public class Consumer {public static void main(String args[]) throws IOException, TimeoutException {//获取连接工厂Connection connection = RabbitMQUtil.getConnection();//创建通道Channel channel = connection.createChannel();//通道绑定对象channel.queueDeclare("hello",true,false,false,null);//消费消息//参数1:消费哪个队列 队列名称//参数2:开启消息的自动确认机制//参数3:消费时的回调接口channel.basicConsume("hello",true,new DefaultConsumer(channel){@Override//最后一个参数:消息队列中取出的消息public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("获取消息 " + new String(body));}});}
    }
    

    运行生产者类,然后再运行消费者类,就可以看到,消费者类拿到生产者的消息了~

  • 2.work quene(工作队列)模型

    一个消费者消费的速度还赶不上生产的速度,GDP上不来,那怎么办,创造多个消费者好了。

    一样还是有一个生产者:

    public class Provider {public static void main(String[] args) throws IOException {//获取连接Connection connection = RabbitMQUtil.getConnection();Channel channel = connection.createChannel();//通过通道声明队列channel.queueDeclare("work", true, false, false, null);//生产消息for (int i = 0; i < 10; i++) {channel.basicPublish("", "work", null, (i+"hello workqueue").getBytes());}//关闭资源RabbitMQUtil.closeConnectionAndChanel(channel, connection);}
    }
    

    不同的是有两个消费者:

    public class Consumer1 {public static void main(String[] args) throws IOException {Connection connection = RabbitMQUtil.getConnection();Channel channel = connection.createChannel();channel.basicQos(1);//一次只能消费一个消息channel.queueDeclare("work",true,false,false,null);//参数2:消费者自动向rabbitmq确认消息消费,这里设置为false,避免消费者死掉后消息随之消失channel.basicConsume("work",false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者-1:" + new String(body));//手动确认消息消费完毕,参数1:确认标识 参数2:false每次确认一个channel.basicAck(envelope.getDeliveryTag(),false);}});}
    }
    
    public class Consumer2 {……代码跟Consumer1一样
    }
    

    可以看到,和直连模型相比,工作模型并没有多出什么额外的东西,只是新增了一个消费者而已。

    需要注意的是,为了避免消费者拿到消息后死掉,使消息也丢失,我们需要将通道设置为每次只能消费一个消息(channel.basicQos(1)),同时将消费消息时的第二个参数设置为false,取消自动确认,改为在消费完毕后手动确认。

    另外。

    默认情况下,消费者们采用轮询的方式获取消息,你一条我一条,哪怕其中一个消费者效率很慢也是这样。但按照上述设置后,就可以实现能者多劳的效果。

  • 3.Fanout(广播)模型

    在广播模型中,可以有多个消费者,每个消费者有自己的临时队列,生产者的消息交给交换机,交换机来决定交给哪个消费者的队列。一般情况下所有的消费者都能拿到消息,实现一条消息被多个消费者消费。

    同样有一个生产者,但是生产者不声明队列,而是声明并指向交换机:

    public class Provider {public static void main(String[] args) throws IOException {Connection connection = RabbitMQUtil.getConnection();Channel channel = connection.createChannel();//将通道指向交换机 //参数1:交换机名称 参数2:交换机类型channel.exchangeDeclare("logs","fanout");//发送消息 参数1:交换机名称 参数2:路由key,在广播模型中可以忽略 参数3:消息体channel.basicPublish("logs","",null,"fanout type messgae".getBytes());RabbitMQUtil.closeConnectionAndChanel(channel,connection);}
    }
    

    消费者同样绑定交换机,并创建临时队列,临时队列绑定交换机:

    public class Consumer1 {public static void main(String[] args) throws IOException {Connection connection = RabbitMQUtil.getConnection();Channel channel = connection.createChannel();//通道绑定交换机channel.exchangeDeclare("logs","fanout");//创建临时队列String queueName = channel.queueDeclare().getQueue();//绑定交换机和队列channel.queueBind(queueName,"logs","");//消费消息channel.basicConsume(queueName,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者-1:" + new String(body));}});}
    }
    

    消费者2、消费者3都是同样的代码,这里就省略掉了。

    此时启动消费者,然后再启动生产者,就能看到所有消费者都拿到了同一条消息。

  • Routing(路由)模型

    在广播模型中,所有消费者都能拿到消息,但是如果我们不想让他拿到呢?如果我们想要不同的消费去拿不同类型的消息呢?这个时候就能用到路由模型了。

    • 4.路由之订阅模型-Direct(直连/订阅制)

      在直连模型下,队列不能直接绑定交换机了,而是要指定一个routerkey,消息发送方在向交换机发送消息时也必须指定routerkey,这样交换机在把消息交给队列时,就会通过routerkey进行判断,实现”订阅制“。

      和上面的所有模型一样,首先创建一个生产者:

      public class Provider {public static void main(String[] args) throws IOException {Connection connection = RabbitMQUtil.getConnection();Channel channel = connection.createChannel();//通过通道声明交换机,参数1:交换机名称 参数2:路由模式channel.exchangeDeclare("logs_direct","direct");//发送消息String routerKey = "info";channel.basicPublish("logs_direct",routerKey,null,("这是direct模型发布的基于RouterKey:[" + routerKey + "]发送的消息").getBytes());RabbitMQUtil.closeConnectionAndChanel(channel,connection);}
      }
      

      可以看到,这次指定了模式为direct路由模式,同时指定routerkey为info。

      创建消费者1和消费者2:

      public class Consumer1 {public static void main(String[] args) throws IOException {Connection connection = RabbitMQUtil.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("logs_direct","direct");//创建临时队列String queueName = channel.queueDeclare().getQueue();//基于routerkey绑定交换机channel.queueBind(queueName,"logs_direct","error");//消费消息channel.basicConsume(queueName,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1:" + new String(body));}});}
      }
      
      public class Consumer2 {public static void main(String[] args) throws IOException {Connection connection = RabbitMQUtil.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("logs_direct","direct");//创建临时队列String queueName = channel.queueDeclare().getQueue();//基于routerkey绑定交换机channel.queueBind(queueName,"logs_direct","info");channel.queueBind(queueName,"logs_direct","error");channel.queueBind(queueName,"logs_direct","warning");//消费消息channel.basicConsume(queueName,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2:" + new String(body));}});}
      }
      

      很明显的,消费者1只指定了一个routerkey:error,而消费者2则是有多个routerkey:info、error、warning……如果此时运行消费者1和2,然后运行生产者,消费者1空手而归,而消费者2就会拿到消息。这是因为2绑定了和生产者一样的routerkey:info。

    • 5.Routing路由模型之Topics(动态路由)模型

      在direct订阅模型中,如果需要接受多种消息,就要指定多个routerkey,如果数量一多就很很麻烦,这种情况就能使用topics模型。

      public class Provider {public static void main(String[] args) throws IOException {Connection connection = RabbitMQUtil.getConnection();Channel channel = connection.createChannel();//声明交换机和交换机类型channel.exchangeDeclare("topics","topic");//发布消息String routerKey = "user.save.info";channel.basicPublish("topics",routerKey,null,("这里是topic动态路由模型,routerkey:[" + routerKey + "]发布的消息").getBytes());RabbitMQUtil.closeConnectionAndChanel(channel,connection);}
      }
      
      public class Consumer1 {public static void main(String[] args) throws IOException {Connection connection = RabbitMQUtil.getConnection();Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare("topics","topic");//创建临时队列String queueName = channel.queueDeclare().getQueue();//绑定交换机和队列 动态通配符形式的routerkeychannel.queueBind(queueName,"topics","user.#");channel.basicConsume(queueName,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1:" + new String(body));}});}
      }
      

      可以看到,消费者能够通过*或#通配符来动态指定routerkey,来匹配生产者user.info.xx这种形式,这样就比direct模型要灵活强大很多。

      注:*匹配一个词,#匹配多个。

  • 6.RPC模型

    RPC模型并不适用于消息队列,而是用于RPC通信,所以这里就暂时不进行学习了。

  • 7.PublisherConfirms(发布和订阅模型)

    这是新出的一种模型,并没有得到广泛使用,也暂时不学习了。

三、Spirngboot整合RabbitMQ

和jdbc一样,rabbitmq的这种原始方式虽然可用,但是同样会产生大量样板代码,所以Springboot封装了一套东西供我们使用(所以上面这么多内容时闹什么呢……)。

  • 1.初来乍到Hello直连模型

    使用RabbitTemplate可以轻松发送消息:

    @SpringBootTest(classes = RabbitMQApplication.class)
    @RunWith(SpringRunner.class)
    public class TestRabbitMQ {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test(){rabbitTemplate.convertAndSend("hello","helloworld");}
    }
    

    在消费者类上使用注解@RabbitListener可以监听消息,具体使用如下:

    @Component
    //声明并创建队列
    @RabbitListener(queuesToDeclare = @Queue(value = "hello"))
    //也可以设置队列是否持久化、是否自动删除等参数
    //@RabbitListener(queuesToDeclare = @Queue(value = "hello",durable = "false",autoDelete = "true"))
    public class HelloConsumer {//具体接收消息的方法@RabbitHandlerpublic void receivel(String message){System.out.println("message: " + message);}
    }
    
  • 2.Work模型

    @RabbitListener不止可以用在类上,还可以用在方法上,表明这个方法就是一个消费者,这样就能造出多个消费者,实现Work模型了。

    //work模型@Testpublic void testWork(){for (int i = 0;i < 10;i++){rabbitTemplate.convertAndSend("work","work模型" + i);}}
    
    @Component
    public class WorkConsumer {@RabbitListener(queuesToDeclare = @Queue("work"))public void receiver1(String message){System.out.println("消费者1:" + message);}@RabbitListener(queuesToDeclare = @Queue("work"))public void receiver2(String message){System.out.println("消费者2:" + message);}
    }
    
  • 3.Fanout广播模型

    //Fanout模型@Testpublic void testFanout(){rabbitTemplate.convertAndSend("logs","","Fanout广播模型");}
    
    @Component
    public class FanoutConsumer {@RabbitListener(bindings = {@QueueBinding(value = @Queue,//创建临时队列exchange = @Exchange(value = "logs",type = "fanout"))//绑定交换机})public void receive1(String message){System.out.println("消费者1:" + message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue,//创建临时队列exchange = @Exchange(value = "logs",type = "fanout"))//绑定交换机})public void receive2(String message){System.out.println("消费者2:" + message);}
    }
    
  • 4.路由direct模型

        //路由direct模型@Testpublic void testDirect(){rabbitTemplate.convertAndSend("directs","info","路由direct模型");}
    
    @Component
    public class DirectConsumer {@RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "directs",type = "direct"),key = {"info","error","warn"})})public void receiv1(String message){System.out.println("消费者1:" + message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "directs",type = "direct"),key = {"error"})})public void receiv2(String message){System.out.println("消费者2:" + message);}
    }
    
  • 5.动态路由topic模型

    //动态路由topic模型@Testpublic void testTopic(){rabbitTemplate.convertAndSend("topics","user.save","动态路由topic模型");}
    
    @Component
    public class TopicConsumer {@RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(type = "topic",name = "topics"),key = {"user.save","user.*"})})public void receive1(String message){System.out.println("消费者1:" + message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(type = "topic",name = "topics"),key = {"user.delete"})})public void receive2(String message){System.out.println("消费者2:" + message);}
    }
    

四、RabbitMQ集群

待更新……

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/58202.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Vue3】transition 组件

1. 基础用法 <template><div class"content"><button click"flag !flag">switch</button><transition name"fade"><div v-if"flag" class"box"></div></transition><…

学习c++的第6天

#include <iostream> using namespace std; class Animal { public: virtual void perform()0; virtual ~Animal() { cout<<"Animal的析构函数"<<endl; } }; class Lion :public Animal { public : void perform() { cout<<"狮子…

政务大厅人员睡岗离岗玩手机识别算法

人员睡岗离岗玩手机识别算法通过pythonyolo系列网络框架算法模型&#xff0c;人员睡岗离岗玩手机识别算法利用图像识别和行为分析&#xff0c;识别出睡岗、离岗和玩手机等不符合规定的行为&#xff0c;并发出告警信号以提醒相关人员。Python是一种由Guido van Rossum开发的通用…

2023年PMP最后一次考试,应该如何把握?

2023年PMP最后一次考试&#xff0c;应该如何把握? 免费送备考资料。 一、什么时间报名和考试&#xff1f; 根据国内PMP考试的主办方中国国际人才交流基金会的通知&#xff0c;2023年的PMP考试一共有三次&#xff0c;分别是5月、8月、11月&#xff0c;11月的具体考试时间以官…

Unity引擎修改模型顶点色的工具

大家好&#xff0c;我是阿赵。   之前分享过怎样通过MaxScript在3DsMax里面修改模型的顶点色。不过由于很多时候顶点色的编辑需要根据在游戏引擎里面的实际情况和shader的情况来动态调整&#xff0c;所以如果能在引擎里面直接修改模型的顶点色&#xff0c;将会方便很多。于是…

【力扣每日一题】2023.8.24 统计参与通信的服务器

目录 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代码&#xff1a; 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 题目顾名思义&#xff0c;要我们统计参与通信的服务器&#xff0c;给我们一个二维矩阵&#xff0c;元素为1的位置则表示是一台服务器。 …

input子系统

内核分三层 1、事件处理层 2、核心层 3、设备驱动层 当硬件按下&#xff0c;在设备驱动中触发中断&#xff0c;中断程序会将事件上报给核心层 核心层将事件给事件处理层&#xff0c;最后事件处理层控制app应用层的怎么操作将数据发送到用户空间

PIP 常用操作汇总

1. 升级 python -m pip install --upgrade pip2. 列出所有安装包 pip list3. 查找特定包 pip list | findstr xxx4. 查看特定包 pip show xxx5. 安装软件包 pip install pyzmq24.0.16. 卸载软件包 pip uninstall -y pyzmq7. 查看配置 # 生效的配置&#xff08;global -&…

lnmp架构-nginx

6.nginx基础配置 证书 重定向&#xff08;80重定向到443&#xff09; 当访问http时 直接到 https 自动索引&#xff1a; 下载方便 Nginx缓存配置 &#xff1a;缓存可以降低网站带宽&#xff0c;加速用户访问 日志轮询 禁用不必要的日志记录 以节省磁盘IO的消耗 监控的信息 监…

Java实现根据关键词搜索1688商品新品数据方法,1688API节课申请指南

要通过1688的API获取商品新品数据&#xff0c;您可以使用1688开放平台提供的接口来实现。以下是一种使用Java编程语言实现的示例&#xff0c;展示如何通过1688开放平台API获取商品新品数据&#xff1a; 首先&#xff0c;确保您已注册成为1688开放平台的开发者&#xff0c;并创…

2.3 【MySQL】命令行和配置文件中启动选项的区别

在命令行上指定的绝大部分启动选项都可以放到配置文件中&#xff0c;但是有一些选项是专门为命令行设计的&#xff0c;比方说defaults-extra-file 、 defaults-file 这样的选项本身就是为了指定配置文件路径的&#xff0c;再放在配置文件中使用就没啥意义了。 如果同一个启动选…

HUT23级训练赛

目录 A - tmn学长的字符串1 B - 帮帮神君先生 C - z学长的猫 D - 这题用来防ak E - 这题考察FFT卷积 F - 这题考察二进制 G - 这题考察高精度 H - 这题考察签到 I - 爱派克斯&#xff0c;启动! J - tmn学长的字符串2 K - 秋奕来买瓜 A - tmn学长的字符串1 思路&#x…

Vue项目中app.js过大,导致web初始化加载过慢问题

1、删除多余不需要的库&#xff1a; npm uninstall xxx 如例如moment库文件是很大的可以直接放到index.html文件直接CDN引入 2、修改/config/index.js配置文件&#xff1a;将productionGzip设置为false ​ 3、设置vue-router懒加载 懒加载配置&#xff1a; ​ 非懒加载配置&…

Spring security报栈溢出几种可能的情况

今天在运行spring security的时候&#xff0c;发现出现了栈溢出的情况&#xff0c;总结可能性如下&#xff1a; 1.UserDetailsService的实现类没有加上Service注入到容器中&#xff0c;导致容器循环寻找UserDetailsService的实现类&#xff0c;最终发生栈溢出的现象。 解决方法…

JavaSE学习——异常

目录 一、异常概述 二、异常的体系结果 二、异常的处理&#xff1a;抓抛模型 三、try-catch-finally的使用 四、throws 异常类型 的使用 五、开发中如何选择使用try-catch-finally还是使用throws&#xff1f; 六、自定义异常 自定义异常步骤&#xff1a; 七、总结&a…

Vue中使用ref($refs未定义underfined)

1. 调用地方 ref是在渲染之后才出现的 可以在mounted里面调用可以使用this.$nextTick(()>{})里面调用 2. 调用对象 ref不是响应式的&#xff0c;所以动态加载模板更新&#xff0c;ref无法更新&#xff0c;以下情况是出现underfined 结合v-if使用 …

复原20世纪复古修仙游戏

前言 在本教程中&#xff0c;我突发奇想&#xff0c;想做一个复古的修仙游戏&#xff0c;考虑到以前的情怀决定做个古老的躺平修仙游戏 &#x1f4dd;个人主页→数据挖掘博主ZTLJQ的主页 个人推荐python学习系列&#xff1a; ☄️爬虫JS逆向系列专栏 - 爬虫逆向教学 ☄️python…

适配器模式简介

概念&#xff1a; 适配器模式&#xff08;Adapter Pattern&#xff09;是一种结构型设计模式&#xff0c;用于将一个类的接口转换成客户端所期望的另一个接口。它允许不兼容的接口之间进行协同工作。 特点&#xff1a; 通过适配器&#xff0c;可以使原本因为接口不匹配而无法…

Elasticsearch 集成---Spark Streaming 框架集成

一.Spark Streaming 框架介绍 Spark Streaming 是 Spark core API 的扩展&#xff0c;支持实时数据流的处理&#xff0c;并且具有可扩展&#xff0c; 高吞吐量&#xff0c;容错的特点。 数据可以从许多来源获取&#xff0c;如 Kafka &#xff0c; Flume &#xff0c; Kin…

全景图像生成算法

摘要 全景图像生成是计算机视觉领域的一个重要研究方向。本文对五种经典的全景图像生成算法进行综述&#xff0c;包括基于相机运动估计的算法、基于特征匹配的算法、基于图像切割的算法、基于多项式拟合的算法和基于深度学习的算法。通过对这些算法的原理、优缺点、适用场景等…