RabbitMQ教程总结

【译】RabbitMQ教程一

  • 主要通过Hello Word对RabbitMQ有初步认识

【译】RabbitMQ教程二

  • 工作队列,即一个生产者对多个消费者
  • 循环分发、消息确认、消息持久、公平分发

【译】RabbitMQ教程三

  • 如何同一个消息同时发给多个消费者
  • 开始引入RabbitMQ消息模型中的重要概念路由器Exchange以及绑定等
  • 使用了fanout类型的路由器

【译】RabbitMQ教程四

  • 如何选择性地接收消息
  • 使用了direct路由器

【译】RabbitMQ教程五

  • 如何通过多重标准接收消息
  • 使用了topic路由器,可通过灵活的路由键和绑定键的设置,
    进一步增强消息选择的灵活性

【译】RabbitMQ教程六

  • 如何使用RabbitMQ实现一个简单的RPC系统
  • 回调队列callback queue和关联标识correlation id

各教程代码

  • 官方:GitHub rabbitmq-tutorials
  • 我整理的:rabbitmq-tutorial-java

RabbitMQ 一般工作流程

生产者和RabbitMQ服务器建立连接和通道,声明路由器,同时为消息设置路由键,这样,所有的消息就会以特定的路由键发给路由器,具体路由器会发送到哪个或哪几个队列,生产者在大部分场景中都不知道。(1个路由器,但不同的消息可以有不同的路由键)。
消费者和RabbitMQ服务器建立连接和通道,然后声明队列,声明路由器,然后通过设置绑定键(或叫路由键)为队列和路由器指定绑定关系,这样,消费者就可以根据绑定键的设置来接收消息。(1个路由器,1个队列,但不同的消费者可以设置不同的绑定关系)。


主要方法

  • 声明队列(创建队列):可以生产者和消费者都声明,也可以消费者声明生产者不声明,也可以生产者声明而消费者不声明。最好是都声明。(生产者未声明,消费者声明这种情况如果生产者先启动,会出现消息丢失的情况,因为队列未创建)
    channel.queueDeclare(String queue, //队列的名字boolean durable, //该队列是否持久化(即是否保存到磁盘中)boolean exclusive,//该队列是否为该通道独占的,即其他通道是否可以消费该队列boolean autoDelete,//该队列不再使用的时候,是否让RabbitMQ服务器自动删除掉Map<String, Object> arguments)//其他参数
  • 声明路由器(创建路由器):生产者、消费者都要声明路由器---如果声明了队列,可以不声明路由器。
    channel.exchangeDeclare(String exchange,//路由器的名字String type,//路由器的类型:topic、direct、fanout、headerboolean durable,//是否持久化该路由器boolean autoDelete,//是否自动删除该路由器boolean internal,//是否是内部使用的,true的话客户端不能使用该路由器Map<String, Object> arguments) //其他参数
  • 绑定队列和路由器:只用在消费者

    channel.queueBind(String queue, //队列String exchange, //路由器String routingKey, //路由键,即绑定键Map<String, Object> arguments) //其他绑定参数
  • 发布消息:只用在生产者

    channel.basicPublish(String exchange, //路由器的名字,即将消息发到哪个路由器String routingKey, //路由键,即发布消息时,该消息的路由键是什么BasicProperties props, //指定消息的基本属性byte[] body)//消息体,也就是消息的内容,是字节数组
    • BasicProperties props:指定消息的基本属性,如deliveryMode为2时表示消息持久,2以外的值表示不持久化消息
      //BasicProperties介绍
      String corrId = "";
      String replyQueueName = "";
      Integer deliveryMode = 2;
      String contentType = "application/json";
      AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).deliveryMode(deliveryMode).contentType(contentType).build();
  • 接收消息:只用在消费者
    channel.basicConsume(String queue, //队列名字,即要从哪个队列中接收消息boolean autoAck, //是否自动确认,默认trueConsumer callback)//消费者,即谁接收消息
    • 消费者中一般会有回调方法来消费消息
      Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, //该消费者的标签Envelope envelope,//字面意思为信封:packaging data for the messageAMQP.BasicProperties properties, //message content header data byte[] body) //message bodythrows IOException {//获取消息示例String message = new String(body, "UTF-8");//接下来就可以根据消息处理一些事情}};

路由器类型

  • fanout:会忽视绑定键,每个消费者都可以接受到所有的消息(前提是每个消费者都要有各自单独的队列,而不是共有同一队列)。
  • direct:只有绑定键和路由键完全匹配时,才可以接受到消息。
  • topic:可以设置多个关键词作为路由键,在绑定键中可以使用*#来匹配
  • headers:(可以忽视它的存在)

教程一 HelloWorld

看主要代码

//生产者
channel.queueDeclare(QUEUE_NAME, false, false, false, null); ----①
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
//消费者
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicConsume(QUEUE_NAME, true, consumer);

这里,生产者和消费者都没有声明路由器,而是声明了同名的队列。生产者发布消息时,使用了默认的无名路由器(""),并以队列的名字作为了路由键。消费者在消费时,由于没有声明路由器,这并不表示没有路由器的存在,消费者此时使用的是默认的路由器,即Default exchange,该路由器和所有的队列都进行绑定,并且使用队列的名字作为了路由键进行绑定。所以,生产者使用默认路由器以队列的名字作为了绑定键进行了消息发布,而消费者也使用了默认的路由器,并以队列的名字作为绑定键进行了绑定。而默认路由器是direct类型,路由键和绑定键完全匹配时,消费者才能接受到消息,所以教程1中的消费者可以接收到消息。(为了认证这一点,可以将代码①去掉,然后先运行消费者,让它等待监听,然后启动生产者,发送消息,消费者同样会收到消息。这里的生产者声明队列,只是让RabbitMQ服务器先创建这个队列,以免发送的消息因为找不到队列而丢失。)


教程二 Work Queues

看主要代码

//生产者
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = "1.";
channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
//消费者
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
channel.basicQos(1);---①...channel.basicAck(envelope.getDeliveryTag(), false);...---③
boolean autoAck = false;---②
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

这里也使用了默认的direct路由器。假如启动多个工作者(消费者),按道理这些工作者应该可以接收到所有的消息啊,但是不要忘了这几个工作者都是从同一个队列中取消息,消息取出一个,队列中就少一个,所以每个工作者都只是收到的消息的一部分。既然这几个工作者都从同一个队列中取消息,那每个工作者应该怎么取呢?

如果没有代码①,并且②设置为true,即自动确认收到消息,RabbitMQ只要发出消息就认为消费者收到了,此时RabbitMQ采取的是循环分发的策略,在这几个工作者中循环轮流分发消息。每个工作者接受到的消息数量都是相同的。
如果有代码①,并且②设置为false,则RabbitMQ会采取公平分发策略,即将消息发给空闲的工作者(空闲,工作者将消息处理完毕,执行了代码③;不空闲,即工作者还在处理消息,还没有给RabbitMQ发回确认信息,即还没有执行代码③)。
代码①中的参数1:(prefetchCount)maximum number of messages that the server will deliver

为了防止队列丢失,在声明队列的时候指定了durabletrue。为了防止消息丢失,设置了消息属性BasicPropertiesMessageProperties.PERSISTENT_TEXT_PLAIN,让我们看看值是什么:


MessageProperties.PERSISTENT_TEXT_PLAIN

可以看出里面包含了deliveryMode=2。从这张图也可以看到BasicProperties属性的全貌。

如果想让多个消费者共同消费某些消息,只要让他们共用同一队列即可(当然前提是你得保证消息可以都进到这个队列中来,如本例中使用direct路由器,消息的路由键和队列的绑定键设为一致,当然也可以使用fanout路由器,路由键和绑定键随意设置,不一致也能收到,因为fanout路由器会忽略路由键的设置)。


教程三 Publish/Subscribe

看主要代码

 //生产者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));//消费者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String queueName = channel.queueDeclare().getQueue();---①
channel.queueBind(queueName, EXCHANGE_NAME, "");
channel.basicConsume(queueName, true, consumer);

教程三才引出路由器的概念。生产者和消费者声明了同样的路由,并指明路由类型为fanout,该路由器会忽视路由键,将消息发布到所有绑定的队列中(仍需要绑定,只是绑定时绑定键任意就行了)。
假如启动多个消费者,因为代码①中调用无参的声明去恶劣方法channel.queueDeclare(),就会创建了一个非持久、独特的、自动删除的队列,并返回一个自动生成的名字。所以多个消费者取消息时使用的是各自的队列,不会存在多个消费者从同一个队列取消息的情况。
这样多个消费者就可以接收到同一消息。

如果想实现多个消费者都可以接收到所有的消息,只要让他们各自使用单独的队列即可(当然前提是保证路由键和绑定键的设置可以让消息都进入到队列,如本例中使用fanout路由器,无需考虑绑定键和路由键)。


教程4 Routing

看主要代码:

//生产者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
//消费者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
String[] severities = {"info", "warning", "error"};
for (String severity : severities) {channel.queueBind(queueName, EXCHANGE_NAME, severity);
} 
channel.basicConsume(queueName, true, consumer);

可以看出,教程3使用了direct路由器,该路由器的特点是可以设定路由键和绑定键,消费者只能从队列中取出两者匹配的消息。
在生产者发消息时,为消息设置不同的路由键(如例子中severity可以设为infowarnerror)。
消费者在通过为队列设置多个绑定关系,来选择想要接收的消息。
这里有一个概念叫做多重绑定,即多个队列以相同的绑定键binding key绑定到同一个路由器上,此时direct路由器就会像fanout路由器一样,将消息广播给所有符合路由规则的队列。


教程5 Topics

看主要代码:

//生产者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String routingKey = "";
String message = "msg...";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
//消费者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
String bingingKeys[] = {""};for (String bindingKey : bingingKeys) {channel.queueBind(queueName, EXCHANGE_NAME,   bindingKey);}
channel.basicConsume(queueName, true, consumer);

这里使用了topic路由器,它与direct路由器类似,不同在于,topic路由器可以为路由键设置多重标准。一个消息有一个路由键,direct路由器只能为路由键指定一个关键字,但是topic路由器可以在路由键中通过点号分割多个单词来组成路由键,消费者在绑定的时候,可以设置多重标准来选择接受。
举个例子:假如日志根据严重级别infowarnerror,也可以根据来源分为cronkernauth。某个日志消息设置路由键为kern.info,表示来自kerninfo级别的日志。想要选择接收消息的时候,direct路由器就办不到,它要么可以根据严重级别来筛选,要么根据来源来筛选,而topic路由器则可以轻松应对,只要将绑定键设置为kern.info就可以精准获取该类型的日志。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/387431.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

机器学习实战(笔记)------------KNN算法

1.KNN算法 KNN算法即K-临近算法&#xff0c;采用测量不同特征值之间的距离的方法进行分类。 以二维情况举例&#xff1a; 假设一条样本含有两个特征。将这两种特征进行数值化&#xff0c;我们就可以假设这两种特种分别为二维坐标系中的横轴和纵轴&#xff0c;将一个样本以点的形…

Java注解Annotation 完成验证

Java注解Annotation用起来很方便&#xff0c;也越来越流行&#xff0c;由于其简单、简练且易于使用等特点&#xff0c;很多开发工具都提供了注解功能&#xff0c;不好的地方就是代码入侵比较严重&#xff0c;所以使用的时候要有一定的选择性。 这篇文章将利用注解&#xff0c;来…

隐藏马尔科夫模型HMM

概率图模型 HMM 先从一个具体的例子入手,看看我们要解决的实际问题.例子引自wiki.https://en.wikipedia.org/wiki/Hidden_Markov_model Consider two friends, Alice and Bob, who live far apart from each other and who talk together daily over the telephone about what …

阿里云天池 金融风控训练营Task1 广东工业站

Task1 赛题理解 一、学习知识点概要 本次学习先是介绍了赛题的背景和概况&#xff0c;题目以金融风控中的个人信贷为背景&#xff0c;给所给的47列特征中&#xff0c;根据贷款申请人的数据信息预测其是否有违约的可能&#xff0c;以此判断是否通过贷款。随后介绍了比赛中的评…

如何将.crt的ssl证书文件转换成.pem格式

如何将.crt的ssl证书文件转换成.pem格式摘自&#xff1a;https://www.landui.com/help/show-8127 2018-07-04 14:55:41 2158次 准备:有一台安装了php的linux操作系统执行下面的openssl命令即可&#xff1a;openssl x509 -in www.xx.com.crt -out www.xx.com.pem转载于:https://…

SpringMVC学习记录--Validator验证分析

一.基于Validator接口的验证. 首先创建User实例,并加入几个属性 ?12345678910111213141516171819202122232425262728293031323334<code class"hljs cs">public class User {private String username;private String password;private String nickname;public …

C# 获取句柄程序

这个小程序需要用到系统API&#xff0c;也就是需要用到user32中的三个函数。 第一个&#xff1a;WindowFromPoint 返回一个窗口句柄 第二个&#xff1a;GetWindowText 获取窗口标题 第三个&#xff1a;GetClassName 获取类名 当然&#xff0c;最重要的一点就是要引用命名空间…

centos7安装oracle12c 一

本文 基本参考了下面这篇文章http://blog.csdn.net/gq5251/article/details/42004035 和http://www.linuxidc.com/Linux/2017-08/146528.htm 但是改正了一些错误操作系统:CentOS Linux release 7.2.1511 (Core) oracle: oarcle (12.1.0.2.0) - Standard Edition (SE2)几点要注…

阿里云天池 Python训练营Task4: Python数据分析:从0完成一个数据分析实战 学习笔记

本学习笔记为阿里云天池龙珠计划Python训练营的学习内容&#xff0c;学习链接为&#xff1a;https://tianchi.aliyun.com/specials/promotion/aicamppython?spm5176.22758685.J_6770933040.1.6f103da1tESyzu 一、学习知识点概要 本次主要通过阿里云天池的赛题【Python入门系…

JMETER从JSON响应中提取数据

如果你在这里&#xff0c;可能是因为你需要使用JMeter从Json响应中提取变量。 好消息&#xff01;您正在掌握掌握JMeter Json Extractor的权威指南。作为Rest API测试指南的补充&#xff0c;您将学习掌握Json Path Expressions 所需的一切。 我们走吧&#xff01;并且不要惊慌&…

centos7安装oracle12c 二

环境&#xff1a;CentOS7VMware12&#xff0c;分配资源&#xff1a;CPU&#xff1a;2颗&#xff0c;内存&#xff1a;4GB&#xff0c;硬盘空间&#xff1a;30GB Oracle 12C企业版64位 下载地址&#xff1a;http://www.oracle.com/technetwork/database/enterprise-edition/down…

阿里云天池 Python训练营Task5:Python训练营测试 学习笔记

一、学习知识点概要 本次是Python训练营的测试&#xff0c;在45分钟内完成25题&#xff0c;满分100分及格80分。题目主要考察Task1到Task3里面的Python基础知识。在我随到的25道题里&#xff0c;知识点有&#xff1a; 变量&#xff08;包括数据类型和容器类型&#xff09;运算…

centos7安装oracle12c 三

场景描述&#xff1a;我在自己电脑的虚拟机上linux环境下安装oracle11g数据库。 Linux版本为&#xff1a;CentOS release 6.8 (Final)&#xff0c;Oracle版本为&#xff1a;linux.x64_11gR2 问题描述&#xff1a;在oracle安装到Prerequisite Checks这一步的时候&#xff0c;出现…

《属性数据分析引论》 部分课后习题R语言实践(第三章、第四章)

目录 前言 第三章 广义线性模型 习题3.18 a小题 b小题 c小题 d小题 习题3.19 a小题 b小题 c小题 第四章 Logistic回归 习题4.1 a小题 b小题 c小题 d小题 e小题 习题4.2 a小题 b小题 c小题 d小题 小结 前言 习题选自高等教育出版社译制&#xff0c;Alan A…

Linux下SVN搭建

在Linux系统中搭建svn服务所需要用到的软件叫做subversion&#xff0c;可以通过yum来进行安装&#xff0c;如图 安装好软件后第一件事就是创建一个仓库目录 [rootserver1 ~]# mkdir /svn 使用svn自带命令建立仓库 [rootserver1 ~]# svnadmin create /svn 进入该仓库&#xff0c…

可用于 线性判别、聚类分析 的R语言函数总结

一、判别分析 判别分析是一种分类技术&#xff0c;其通过一个已知类别的“训练样本”来建立判别准则&#xff0c;并通过预测变量来为未知类别的数据进行分类。根据判别的模型分为线性判别和非线性判别&#xff0c;线性判别中根据判别准则又分为Fisher判别&#xff0c;Bayes判别…

Android APK 打包过程 MD

Markdown版本笔记我的GitHub首页我的博客我的微信我的邮箱MyAndroidBlogsbaiqiantaobaiqiantaobqt20094baiqiantaosina.comAndroid APK 打包流程 MD 目录 目录APK 的打包流程整体流程资源的编译和打包资源ID资源索引概况具体打包过程aapt阶段aidl阶段Java Compiler阶段dex阶段a…

可用于 主成分分析、R型因子分析、简单相应分析 的R语言函数总结

一、主成分分析 主成分分析是多元统计分析的一种常用的降维方法&#xff0c;它以尽量少的信息损失&#xff0c;最大程度将变量个数减少&#xff0c;且彼此间互不相关。提取出来的新变量成为主成分&#xff0c;主成分是原始变量的线性组合。 1.1 KMO检验和Bartlett球形检验 在…

持续集成之Jenkins安装部署

安装JDKJenkins是Java编写的&#xff0c;所以需要先安装JDK&#xff0c;这里采用yum安装&#xff0c;如果对版本有需求&#xff0c;可以直接在Oracle官网下载JDK。 [rootlinux-node1 ~]# yum install -y java-1.8.0 安装Jekins [rootlinux-node1 ~]# cd /etc/yum.repos.d/ […

jenkins svn tomcat ant自动部署

Jenkins Jenkins是基于Java开发的一种持续集成工具&#xff0c;用于监控持续重复的工作&#xff0c;功能包括&#xff1a; 1、持续的软件版本发布/测试项目。 2、监控外部调用执行的工作。 跟其他持续集成相比&#xff0c;它的主要优点有&#xff1a; 开源&#xff0c;即免…