RabbitMQ 从入门到精通 (一)

目录

  • 1. 初识RabbitMQ
  • 2. AMQP
  • 3.RabbitMQ的极速入门
  • 4. Exchange(交换机)详解
    • 4.1 Direct Exchange
    • 4.2 Topic Exchange
    • 4.3 Fanout Exchange
  • 5. Message 消息

1. 初识RabbitMQ

RabbitMQ 是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用 Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的

RabbitMQ的优点:

  • 开源、性能优秀、稳定性保障
  • 提供可靠性消息投递模式(confirm)、返回模式(return)
  • 与SpringAMQP完美的整合、API丰富
  • 集群模式丰富,表达式配置,HA模式,镜像队列模型
  • 保证数据不丢失的前提下做到高可靠性、可用性

RabbitMQ官网

RabbitMQ的整体架构:

1348730-20190606002957380-2097750065.png

 
RabbitMQ的消息流转:

1348730-20190606002655965-1977548174.png

 

 

2. AMQP

AMQP全称: Advanced Message Queuing Protocol

AMQP翻译: 高级消息队列协议

AMQP定义: 是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计

1348730-20190606002906491-408602073.png

 
 

AMQP核心概念:

  • Server:又称Broker,接受客户端的连接,实现AMQP实体服务
  • Connection:连接,应用程序与Broker的网络连接
  • Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务
  • Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则是消息体的内容
  • Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由。同一个Virtual Host里面不能有相同名称的Exchange或Queue
  • Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列
  • Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key
  • Routing key:一个路由规则,虚拟机可用它确定如何路由一个特定消息
  • Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者

 

 

3.RabbitMQ的极速入门

后台启动: ./rabbitmq start &

关闭: ./rabbitmqctl stop

节点状态: ./rabbitmqctl status

管控台: http://ip:15672

 

 

RabbitMQ生产消费快速入门:

环境: springboot+jdk1.7+rabbitmq3.6.5 (Maven依赖配置)

 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.9.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.6.5</version></dependency></dependencies>

 

public class Procuder {public static void main(String[] args) throws Exception {//1.创建一个ConnectionFactory 并进行配置ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.通过连接工厂创建连接Connection connection = connectionFactory.newConnection();//3.通过Connection 创建一个 ChannelChannel channel = connection.createChannel();/*** basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)* exchange:指定交换机 不指定 则默认 (AMQP default交换机) 通过routingkey进行匹配 * props 消息属性* body 消息体*///4.通过Channel发送数据for(int i = 0; i < 5; i++){System.out.println("生产消息:" + i);String msg = "Hello RabbitMQ" + i;channel.basicPublish("", "test", null, msg.getBytes());}//5.记得关闭相关的连接channel.close();connection.close();}
}

 

public class Consumer {public static void main(String[] args) throws Exception{//1.创建一个ConnectionFactory 并进行配置ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.通过连接工厂创建连接Connection connection = connectionFactory.newConnection();//3.通过Connection 创建一个 ChannelChannel channel = connection.createChannel();//4. 声明创建一个队列String queueName = "test";/*** durable 是否持久化* exclusive 独占的  相当于加了一把锁*/channel.queueDeclare(queueName,true,false,false,null);//5.创建消费者QueueingConsumer queueingConsumer = new QueueingConsumer(channel);//6.设置channel/*** ACK: 当一条消息从生产端发到消费端,消费端接收到消息后会马上回送一个ACK信息给broker,告诉它这条消息收到了* autoack: * true  自动签收 当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除。* false 手动签收 当消费者收到消息在合适的时候来显示的进行确认,说我已经接收到了该消息了,RabbitMQ可以从队列中删除该消息了* */channel.basicConsume(queueName, true, queueingConsumer);//7.获取消息while(true){Delivery delivery = queueingConsumer.nextDelivery();String msg = new String(delivery.getBody());System.err.println("消费端:" + msg);//Envelope envelope = delivery.getEnvelope();}}
}

 

4. Exchange(交换机)详解

Exchange: 接收消息,并根据路由键转发消息所绑定的队列

1348730-20190606003024596-916792922.png

 

交换机属性:

  • Name: 交换机名称
  • Type: 交换机类型 diect、topic、fanout、headers
  • Durability: 是否需要持久化,true为持久化
  • AutoDelete: 当最后一个绑定到Exchange的队列删除后,自动删除该Exchange
  • Internal: 当前Exchange是否用于RabbitMQ内部使用,默认为false (百分之99的情况默认为false 除非对Erlang语言较了解,做一些扩展)
  • Arguments: 扩展参数, 用于扩展AMQP协议可自定化使用

 

4.1 Direct Exchange

所有发送到Direct Exchange的消息被转发到RouteKey指定的Queue

注意:Direct模式可以使用RabbitMQ自带的Exchange: default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RoutingKey必须完全匹配才会被队列接收,否则该消息会被抛弃

1348730-20190606003044253-618019408.png

 

public class ProducerDirectExchange {public static void main(String[] args) throws Exception {//1.创建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");//2.创建ConnectionConnection connection = connectionFactory.newConnection();//3.创建ChannelChannel channel = connection.createChannel();//4.声明String exchangeName = "test_direct_exchange";String routingKey = "test.direct";//5.发送String msg = "Hello World RabbitMQ4 Direct Exchange Message";channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());}
}

 

public class ConsumerDirectExchange {public static void main(String[] args) throws Exception{ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);connectionFactory.setAutomaticRecoveryEnabled(true);connectionFactory.setNetworkRecoveryInterval(3000);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//声明String exchangeName = "test_direct_exchange";String exchangeType = "direct";String queueName = "test_direct_queue";String routingKey = "test.direct";//表示声明了一个交换机channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);//表示声明了一个队列channel.queueDeclare(queueName,false,false,false,null);//建立一个绑定关系channel.queueBind(queueName, exchangeName, routingKey);//durable 是否持久化消息QueueingConsumer consumer = new QueueingConsumer(channel);//参数:队列名称,是否自动ACK,Consumerchannel.basicConsume(queueName, true, consumer);//循环获取消息while(true){//获取消息,如果没有消息,这一步将会一直阻塞Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("收到消息:" + msg);}}
}

 

4.2 Topic Exchange

所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上

Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic

1348730-20190606003103076-1591219841.png

注意:可以使用通配符进行匹配

符号 # 匹配一个或多个词

符号 * 匹配不多不少一个词

例如: "log.#" 能够匹配到 “log.info.oa”

​ "log.*" 只会匹配到 "log.err"

public class ProducerTopicExchange {public static void main(String[] args) throws Exception {//1.创建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.创建ConnectionConnection connection = connectionFactory.newConnection();//3.创建ChannelChannel channel = connection.createChannel();//4.声明String exchangeName = "test_topic_exchange";String routingKey1 = "user.save";String routingKey2 = "user.update";String routingKey3 = "user.delete.abc";//5.发送String msg = "Hello World RabbitMQ4 Direct Exchange Message";channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());}
}

 

public class ConsumerTopicExchange {public static void main(String[] args) throws Exception{ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);connectionFactory.setAutomaticRecoveryEnabled(true);connectionFactory.setNetworkRecoveryInterval(3000);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//声明String exchangeName = "test_topic_exchange";String exchangeType = "topic";String queueName = "test_topic_queue";String routingKey = "user.#";//表示声明了一个交换机channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);//表示声明了一个队列channel.queueDeclare(queueName,false,false,false,null);//建立一个绑定关系channel.queueBind(queueName, exchangeName, routingKey);//durable 是否持久化消息QueueingConsumer consumer = new QueueingConsumer(channel);//参数:队列名称,是否自动ACK,Consumerchannel.basicConsume(queueName, true, consumer);//循环获取消息while(true){//获取消息,如果没有消息,这一步将会一直阻塞Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("收到消息:" + msg);}}
}

 

4.3 Fanout Exchange

不处理路由键,只需要简单的将队列绑定到交换机上
发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
所以Fanout交换机转发消息是最快的

1348730-20190606003117689-979368743.png

 

public class ProducerFanoutExchange {public static void main(String[] args) throws Exception {//1.创建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.创建ConnectionConnection connection = connectionFactory.newConnection();//3.创建ChannelChannel channel = connection.createChannel();//4.声明String exchangeName = "test_fanout_exchange";//5.发送for(int i = 0; i < 10 ; i++){String msg = "Hello World RabbitMQ4 Direct Exchange Message";channel.basicPublish(exchangeName, "", null, msg.getBytes());}channel.close();connection.close();}
}

 

public class ConsumerFanoutExchange {public static void main(String[] args) throws Exception{ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);connectionFactory.setAutomaticRecoveryEnabled(true);connectionFactory.setNetworkRecoveryInterval(3000);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//声明String exchangeName = "test_fanout_exchange";String exchangeType = "fanout";String queueName = "test_topic_queue";//无需指定路由key String routingKey = "";//表示声明了一个交换机channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);//表示声明了一个队列channel.queueDeclare(queueName,false,false,false,null);//建立一个绑定关系channel.queueBind(queueName, exchangeName, routingKey);//durable 是否持久化消息QueueingConsumer consumer = new QueueingConsumer(channel);//参数:队列名称,是否自动ACK,Consumerchannel.basicConsume(queueName, true, consumer);//循环获取消息while(true){//获取消息,如果没有消息,这一步将会一直阻塞Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("收到消息:" + msg);}}
}

 

5. Message 消息

服务器与应用程序之间传递的数据,本质上就是一段数据,由Properties和Body组成

常用属性:delivery mode、headers (自定义属性)

其他属性:content_type、content_encoding、priority、expiration

消息的properties属性用法示例:

public class Procuder {public static void main(String[] args) throws Exception {//1.创建一个ConnectionFactory 并进行配置ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.通过连接工厂创建连接Connection connection = connectionFactory.newConnection();//3.通过Connection 创建一个 ChannelChannel channel = connection.createChannel();Map<String,Object> headers = new HashMap<>();headers.put("my1", "111");headers.put("my2", "222");//10秒不消费 消息过期移除消息队列AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().deliveryMode(2).contentEncoding("utf-8").expiration("10000").headers(headers).build();//4.通过Channel发送数据for(int i = 0; i < 5; i++){System.out.println("生产消息:" + i);String msg = "Hello RabbitMQ" + i;channel.basicPublish("", "test", properties, msg.getBytes());}//5.记得关闭相关的连接channel.close();connection.close();}
}

 

public class Consumer {public static void main(String[] args) throws Exception{//1.创建一个ConnectionFactory 并进行配置ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.通过连接工厂创建连接Connection connection = connectionFactory.newConnection();//3.通过Connection 创建一个 ChannelChannel channel = connection.createChannel();//4. 声明创建一个队列String queueName = "test";channel.queueDeclare(queueName,true,false,false,null);//5.创建消费者QueueingConsumer queueingConsumer = new QueueingConsumer(channel);//6.设置channelchannel.basicConsume(queueName, true, queueingConsumer);//7.获取消息while(true){Delivery delivery = queueingConsumer.nextDelivery();String msg = new String(delivery.getBody());System.err.println("消费端:" + msg);Map<String, Object> headers = delivery.getProperties().getHeaders();System.err.println("headers value:" + headers.get("my1"));}}
}

转载于:https://www.cnblogs.com/dwlovelife/p/10982735.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/448802.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

接收并解析消息体传参、解析 json 参数

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 1.场景&#xff1a;postman 发送了一个 post 请求&#xff0c;如下&#xff1a; 2. 解析方式为用一个 vo 对象来接收 json。把 json 中的…

OpenCL memory object 之 传输优化

首先我们了解一些优化时候的术语及其定义&#xff1a; 1、deferred allocation&#xff08;延迟分配&#xff09;&#xff0c; 在第一次使用memory object传输数据时&#xff0c;runtime才对memory object真正分配空间。 这样减少了资源浪费&#xff0c;但第一次使用时要慢一些…

VBS使文本框的光标位于所有字符后

有时候在文本框里会显示一部分提示信息&#xff0c;用户在这些提示信息后面输入文本&#xff0c;但是将焦点设置于文本框后&#xff0c;光标总是在文本框的最前面&#xff0c; 用户输入的时候需要按"-->"键将光标移到最后才能输入&#xff0c;这样的操作很不爽。我…

AMD OpenCL 大学课程

AMD OpenCL大学课程是非常好的入门级OpenCL教程&#xff0c;通过看教程中的PPT&#xff0c;我们能够很快的了解OpenCL机制以及编程方法。下载地址&#xff1a;http://developer.amd.com/zones/OpenCLZone/universities/Pages/default.aspx 教程中的英文很简单&#xff0c;我相信…

47.QT-QChart之曲线图,饼状图,条形图使用

1.使用准备 在pro中, 添加QT charts 然后在界面头文件中添加头文件并声明命名空间,添加: #include <QtCharts> QT_CHARTS_USE_NAMESPACE 2.QChart之曲线图 绘制曲线图需要用到3个类 QSplineSeries: 用于创建有由一系列数据组成的曲线.类似的还有QPieSeries(饼图数据). Q…

Docker 部署应用、jar 工程 docker 方式部署

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 1. 把要部署的工程打成一个jar包。&#xff08;我的工程叫 gentle &#xff09; 打 jar 的方法&#xff1a;超简单方法&#xff1a; Int…

第二阶段冲刺(2)

1、整个项目预期的任务量 &#xff08;任务量 所有工作的预期时间&#xff09;和 目前已经花的时间 &#xff08;所有记录的 ‘已经花费的时间’&#xff09;&#xff0c;还剩余的时间&#xff08;所有工作的 ‘剩余时间’&#xff09; &#xff1b; 所有工作的预期时间&#…

华为路由器配置DHCP中继

DHCP(动态主机配置协议)理论知识&#xff1a;DHCP主要用来为客户机自动配置I P地址相关的网络参数&#xff0c;包括IP地址、子网掩码、默认网关、DNS服务器等。 DHCP 通信为广播的方式&#xff0c;因此当需要 DHCP 服务器为不同广播域&#xff08;路由或 VLAN 网段&#xff09;…

基于GPU的K-Means聚类算法

聚类是信息检索、数据挖掘中的一类重要技术&#xff0c;是分析数据并从中发现有用信息的一种有效手段。它将数据对象分组成为多个类或簇&#xff0c;使得在同一个簇中的对象之间具有较高的相似度&#xff0c;而不同簇中的对象差别很大。作为统计学的一个分支和一种无监督的学习…

GPU通用计算调研报告

摘要&#xff1a;NVIDIA公司在1999年发布GeForce256时首先提出GPU&#xff08;图形处理器&#xff09;的概念&#xff0c;随后大量复杂的应用需求促使整个产业蓬勃发展至今。GPU在这十多年的演变过程中&#xff0c;我们看到GPU从最初帮助CPU分担几何吞吐量&#xff0c;到Shader…

git 图形化工具 GitKraken 的使用 —— 分支的创建与合并

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 分支管理是Git工作流的重点 在之前的文章中通过GitKraken可以很清楚的看到&#xff0c;每一次commit&#xff0c;git把他们串成了一条线…

GitKraken - 简单教程

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 简单介绍&#xff1a;外观 GitKraken首页预览图 常用快捷键 模糊搜索&#xff1a;(cmd p) 在进行模糊搜索的时候会在当前页面弹出一个…

LeetCode刷题第二天——3Longest Substring Without repeating character 4 Median of Two Sorted Arrays...

混淆点&#xff1a; 子串 连续 子序列 可以不连续 知识点&#xff1a; HashMap&#xff1a; 出现问题&#xff1a; 1.使用unordered_map头文件时报错 #error This file requires compiler and library support for the ISO C 2011 standard. This support is currently experi…

【BZOJ 3339 / BZOJ 3585 / luogu 4137】Rmq Problem / mex

【原题题面】传送门 【题解大意】 都说了是莫队练习题。 考虑已知[l,r]区间的mex值时&#xff0c;如何求[l1,r]的mex值。 比较a[l1]与已知ans的大小&#xff0c;如果a[l1]>ans或者a[l1]<ans&#xff0c;均对答案没有影响。 如果a[l1]ans&#xff0c;考虑找到一个比当前an…

postman 无法正常返回结果 Could not get any response

在浏览器输入地址可以返回结果&#xff0c;但是由于返回的json没有格式&#xff0c;看起来比较麻烦&#xff0c;用postman却报错Could not get any response。 可以注意到下面写了可能的情况&#xff1a;比如服务器无响应&#xff08;由于浏览器可以访问&#xff0c;所以排除…

在Windows 下使用OpenCL

目前&#xff0c;NVIDIA和AMD的Windows driver均有支援OpenCL&#xff08;NVIDIA的正式版driver是从195.62版开始&#xff0c;而AMD则是从9.11版开始&#xff09;。NVIDIA的正式版driver中包含OpenCL.dll&#xff0c;因此可以直接使用。AMD到目前为止&#xff0c;则仍需要安装其…

[Swift]快速反向平方根 | Fast inverse square root

★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★➤微信公众号&#xff1a;山青咏芝&#xff08;shanqingyongzhi&#xff09;➤博客园地址&#xff1a;山青咏芝&#xff08;https://www.cnblogs.com/strengthen/&#xff09;➤GitHub地址&a…

适用于ATI卡的GPU计算MD5的小程序源码,基于AMD APP SDK开发

以下代码在win7 home basic , ati hd 5450平台测试通过&#xff0c;处理速度为每秒100万次。 程序很简单&#xff0c;只有一个main.cpp程序。Device端只有一个md5.cl文件。 下面我把代码贴出来&#xff0c;因为不能上传附件&#xff0c;我把完整工程包放到了242337476的群共享里…

【CentOS 7笔记11】,目录权限,所有者与所有组,隐藏权限#171022

2019独角兽企业重金招聘Python工程师标准>>> shallow丿ove 一. 文件或目录权限change mode r4&#xff0c;w2&#xff0c;x1 selinux开启则权限后面会有个. 更改SElinux配置文件&#xff0c;将永久关闭SElinux [rootlocalhost ~]# vi /etc/selinux/config #将默认…

python字符编码与转码

详细文章: http://www.cnblogs.com/yuanchenqi/articles/5956943.html http://www.diveintopython3.net/strings.html 需知: 1.在python2默认编码是ASCII, python3里默认是unicode 2.unicode 分为 utf-32(占4个字节),utf-16(占两个字节)&#xff0c;utf-8(占1-4个字节)&#xf…