RabbitMQ 从入门到精通 (一)

目录

  • 1. 初识RabbitMQ
  • 2. AMQP
  • 3.RabbitMQ的极速入门
  • 4. Exchange(交换机)详解
    • 4.1 Direct Exchange
    • 4.2 Topic Exchange
    • 4.3 Fanout Exchange
  • 5. Message 消息

1. 初识RabbitMQ

RabbitMQ 是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用 Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的

RabbitMQ的优点:

  • 开源、性能优秀、稳定性保障
  • 提供可靠性消息投递模式(confirm)、返回模式(return)
  • 与SpringAMQP完美的整合、API丰富
  • 集群模式丰富,表达式配置,HA模式,镜像队列模型
  • 保证数据不丢失的前提下做到高可靠性、可用性

RabbitMQ官网

RabbitMQ的整体架构:

1348730-20190606002957380-2097750065.png

 
RabbitMQ的消息流转:

1348730-20190606002655965-1977548174.png

 

 

2. AMQP

AMQP全称: Advanced Message Queuing Protocol

AMQP翻译: 高级消息队列协议

AMQP定义: 是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计

1348730-20190606002906491-408602073.png

 
 

AMQP核心概念:

  • Server:又称Broker,接受客户端的连接,实现AMQP实体服务
  • Connection:连接,应用程序与Broker的网络连接
  • Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务
  • Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则是消息体的内容
  • Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由。同一个Virtual Host里面不能有相同名称的Exchange或Queue
  • Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列
  • Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key
  • Routing key:一个路由规则,虚拟机可用它确定如何路由一个特定消息
  • Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者

 

 

3.RabbitMQ的极速入门

后台启动: ./rabbitmq start &

关闭: ./rabbitmqctl stop

节点状态: ./rabbitmqctl status

管控台: http://ip:15672

 

 

RabbitMQ生产消费快速入门:

环境: springboot+jdk1.7+rabbitmq3.6.5 (Maven依赖配置)

 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.9.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.6.5</version></dependency></dependencies>

 

public class Procuder {public static void main(String[] args) throws Exception {//1.创建一个ConnectionFactory 并进行配置ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.通过连接工厂创建连接Connection connection = connectionFactory.newConnection();//3.通过Connection 创建一个 ChannelChannel channel = connection.createChannel();/*** basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)* exchange:指定交换机 不指定 则默认 (AMQP default交换机) 通过routingkey进行匹配 * props 消息属性* body 消息体*///4.通过Channel发送数据for(int i = 0; i < 5; i++){System.out.println("生产消息:" + i);String msg = "Hello RabbitMQ" + i;channel.basicPublish("", "test", null, msg.getBytes());}//5.记得关闭相关的连接channel.close();connection.close();}
}

 

public class Consumer {public static void main(String[] args) throws Exception{//1.创建一个ConnectionFactory 并进行配置ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.通过连接工厂创建连接Connection connection = connectionFactory.newConnection();//3.通过Connection 创建一个 ChannelChannel channel = connection.createChannel();//4. 声明创建一个队列String queueName = "test";/*** durable 是否持久化* exclusive 独占的  相当于加了一把锁*/channel.queueDeclare(queueName,true,false,false,null);//5.创建消费者QueueingConsumer queueingConsumer = new QueueingConsumer(channel);//6.设置channel/*** ACK: 当一条消息从生产端发到消费端,消费端接收到消息后会马上回送一个ACK信息给broker,告诉它这条消息收到了* autoack: * true  自动签收 当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除。* false 手动签收 当消费者收到消息在合适的时候来显示的进行确认,说我已经接收到了该消息了,RabbitMQ可以从队列中删除该消息了* */channel.basicConsume(queueName, true, queueingConsumer);//7.获取消息while(true){Delivery delivery = queueingConsumer.nextDelivery();String msg = new String(delivery.getBody());System.err.println("消费端:" + msg);//Envelope envelope = delivery.getEnvelope();}}
}

 

4. Exchange(交换机)详解

Exchange: 接收消息,并根据路由键转发消息所绑定的队列

1348730-20190606003024596-916792922.png

 

交换机属性:

  • Name: 交换机名称
  • Type: 交换机类型 diect、topic、fanout、headers
  • Durability: 是否需要持久化,true为持久化
  • AutoDelete: 当最后一个绑定到Exchange的队列删除后,自动删除该Exchange
  • Internal: 当前Exchange是否用于RabbitMQ内部使用,默认为false (百分之99的情况默认为false 除非对Erlang语言较了解,做一些扩展)
  • Arguments: 扩展参数, 用于扩展AMQP协议可自定化使用

 

4.1 Direct Exchange

所有发送到Direct Exchange的消息被转发到RouteKey指定的Queue

注意:Direct模式可以使用RabbitMQ自带的Exchange: default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RoutingKey必须完全匹配才会被队列接收,否则该消息会被抛弃

1348730-20190606003044253-618019408.png

 

public class ProducerDirectExchange {public static void main(String[] args) throws Exception {//1.创建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");//2.创建ConnectionConnection connection = connectionFactory.newConnection();//3.创建ChannelChannel channel = connection.createChannel();//4.声明String exchangeName = "test_direct_exchange";String routingKey = "test.direct";//5.发送String msg = "Hello World RabbitMQ4 Direct Exchange Message";channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());}
}

 

public class ConsumerDirectExchange {public static void main(String[] args) throws Exception{ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);connectionFactory.setAutomaticRecoveryEnabled(true);connectionFactory.setNetworkRecoveryInterval(3000);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//声明String exchangeName = "test_direct_exchange";String exchangeType = "direct";String queueName = "test_direct_queue";String routingKey = "test.direct";//表示声明了一个交换机channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);//表示声明了一个队列channel.queueDeclare(queueName,false,false,false,null);//建立一个绑定关系channel.queueBind(queueName, exchangeName, routingKey);//durable 是否持久化消息QueueingConsumer consumer = new QueueingConsumer(channel);//参数:队列名称,是否自动ACK,Consumerchannel.basicConsume(queueName, true, consumer);//循环获取消息while(true){//获取消息,如果没有消息,这一步将会一直阻塞Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("收到消息:" + msg);}}
}

 

4.2 Topic Exchange

所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上

Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic

1348730-20190606003103076-1591219841.png

注意:可以使用通配符进行匹配

符号 # 匹配一个或多个词

符号 * 匹配不多不少一个词

例如: "log.#" 能够匹配到 “log.info.oa”

​ "log.*" 只会匹配到 "log.err"

public class ProducerTopicExchange {public static void main(String[] args) throws Exception {//1.创建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.创建ConnectionConnection connection = connectionFactory.newConnection();//3.创建ChannelChannel channel = connection.createChannel();//4.声明String exchangeName = "test_topic_exchange";String routingKey1 = "user.save";String routingKey2 = "user.update";String routingKey3 = "user.delete.abc";//5.发送String msg = "Hello World RabbitMQ4 Direct Exchange Message";channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());}
}

 

public class ConsumerTopicExchange {public static void main(String[] args) throws Exception{ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);connectionFactory.setAutomaticRecoveryEnabled(true);connectionFactory.setNetworkRecoveryInterval(3000);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//声明String exchangeName = "test_topic_exchange";String exchangeType = "topic";String queueName = "test_topic_queue";String routingKey = "user.#";//表示声明了一个交换机channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);//表示声明了一个队列channel.queueDeclare(queueName,false,false,false,null);//建立一个绑定关系channel.queueBind(queueName, exchangeName, routingKey);//durable 是否持久化消息QueueingConsumer consumer = new QueueingConsumer(channel);//参数:队列名称,是否自动ACK,Consumerchannel.basicConsume(queueName, true, consumer);//循环获取消息while(true){//获取消息,如果没有消息,这一步将会一直阻塞Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("收到消息:" + msg);}}
}

 

4.3 Fanout Exchange

不处理路由键,只需要简单的将队列绑定到交换机上
发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
所以Fanout交换机转发消息是最快的

1348730-20190606003117689-979368743.png

 

public class ProducerFanoutExchange {public static void main(String[] args) throws Exception {//1.创建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.创建ConnectionConnection connection = connectionFactory.newConnection();//3.创建ChannelChannel channel = connection.createChannel();//4.声明String exchangeName = "test_fanout_exchange";//5.发送for(int i = 0; i < 10 ; i++){String msg = "Hello World RabbitMQ4 Direct Exchange Message";channel.basicPublish(exchangeName, "", null, msg.getBytes());}channel.close();connection.close();}
}

 

public class ConsumerFanoutExchange {public static void main(String[] args) throws Exception{ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);connectionFactory.setAutomaticRecoveryEnabled(true);connectionFactory.setNetworkRecoveryInterval(3000);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//声明String exchangeName = "test_fanout_exchange";String exchangeType = "fanout";String queueName = "test_topic_queue";//无需指定路由key String routingKey = "";//表示声明了一个交换机channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);//表示声明了一个队列channel.queueDeclare(queueName,false,false,false,null);//建立一个绑定关系channel.queueBind(queueName, exchangeName, routingKey);//durable 是否持久化消息QueueingConsumer consumer = new QueueingConsumer(channel);//参数:队列名称,是否自动ACK,Consumerchannel.basicConsume(queueName, true, consumer);//循环获取消息while(true){//获取消息,如果没有消息,这一步将会一直阻塞Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("收到消息:" + msg);}}
}

 

5. Message 消息

服务器与应用程序之间传递的数据,本质上就是一段数据,由Properties和Body组成

常用属性:delivery mode、headers (自定义属性)

其他属性:content_type、content_encoding、priority、expiration

消息的properties属性用法示例:

public class Procuder {public static void main(String[] args) throws Exception {//1.创建一个ConnectionFactory 并进行配置ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.通过连接工厂创建连接Connection connection = connectionFactory.newConnection();//3.通过Connection 创建一个 ChannelChannel channel = connection.createChannel();Map<String,Object> headers = new HashMap<>();headers.put("my1", "111");headers.put("my2", "222");//10秒不消费 消息过期移除消息队列AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().deliveryMode(2).contentEncoding("utf-8").expiration("10000").headers(headers).build();//4.通过Channel发送数据for(int i = 0; i < 5; i++){System.out.println("生产消息:" + i);String msg = "Hello RabbitMQ" + i;channel.basicPublish("", "test", properties, msg.getBytes());}//5.记得关闭相关的连接channel.close();connection.close();}
}

 

public class Consumer {public static void main(String[] args) throws Exception{//1.创建一个ConnectionFactory 并进行配置ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.通过连接工厂创建连接Connection connection = connectionFactory.newConnection();//3.通过Connection 创建一个 ChannelChannel channel = connection.createChannel();//4. 声明创建一个队列String queueName = "test";channel.queueDeclare(queueName,true,false,false,null);//5.创建消费者QueueingConsumer queueingConsumer = new QueueingConsumer(channel);//6.设置channelchannel.basicConsume(queueName, true, queueingConsumer);//7.获取消息while(true){Delivery delivery = queueingConsumer.nextDelivery();String msg = new String(delivery.getBody());System.err.println("消费端:" + msg);Map<String, Object> headers = delivery.getProperties().getHeaders();System.err.println("headers value:" + headers.get("my1"));}}
}

转载于:https://www.cnblogs.com/dwlovelife/p/10982735.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/448802.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

接收并解析消息体传参、解析 json 参数

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 1.场景&#xff1a;postman 发送了一个 post 请求&#xff0c;如下&#xff1a; 2. 解析方式为用一个 vo 对象来接收 json。把 json 中的…

OpenCL memory object 之 传输优化

首先我们了解一些优化时候的术语及其定义&#xff1a; 1、deferred allocation&#xff08;延迟分配&#xff09;&#xff0c; 在第一次使用memory object传输数据时&#xff0c;runtime才对memory object真正分配空间。 这样减少了资源浪费&#xff0c;但第一次使用时要慢一些…

VBS使文本框的光标位于所有字符后

有时候在文本框里会显示一部分提示信息&#xff0c;用户在这些提示信息后面输入文本&#xff0c;但是将焦点设置于文本框后&#xff0c;光标总是在文本框的最前面&#xff0c; 用户输入的时候需要按"-->"键将光标移到最后才能输入&#xff0c;这样的操作很不爽。我…

记录ionic 最小化应用时所遇的问题

ionic3与ionic4最小化插件安装不一样&#xff1a; ionic3安装方法&#xff1a; $ ionic cordova plugin add cordova-plugin-appminimize $ npm install --save ionic-native/app-minimize4 并在app.module.ts中 注入依赖&#xff1a; import { AppMinimize } from ionic-nativ…

解决 --- Docker 启动时报错:iptables:No chain/target/match by the name

问题&#xff1a;jenkins的docker containner启动失败&#xff0c;报错&#xff1a;failed programming external connectivity … iptables: No chain/target/match by that name” docker 服务启动的时候&#xff0c;docker服务会向iptables注册一个链&#xff0c;以便让dock…

AMD OpenCL 大学课程

AMD OpenCL大学课程是非常好的入门级OpenCL教程&#xff0c;通过看教程中的PPT&#xff0c;我们能够很快的了解OpenCL机制以及编程方法。下载地址&#xff1a;http://developer.amd.com/zones/OpenCLZone/universities/Pages/default.aspx 教程中的英文很简单&#xff0c;我相信…

第一篇 计算机基础

1.什么是编程语言 python和中文、英语一样、都是一门语言&#xff0c;只要是语言&#xff0c;其实就库看成是一种事物与另一种事物沟通的介质。python属于编程语言&#xff0c;编程语言是程序员与计算机之间沟通的介质&#xff1b;中文和英文则是人与人之间沟通的介质。 2.什么…

47.QT-QChart之曲线图,饼状图,条形图使用

1.使用准备 在pro中, 添加QT charts 然后在界面头文件中添加头文件并声明命名空间,添加: #include <QtCharts> QT_CHARTS_USE_NAMESPACE 2.QChart之曲线图 绘制曲线图需要用到3个类 QSplineSeries: 用于创建有由一系列数据组成的曲线.类似的还有QPieSeries(饼图数据). Q…

Docker 部署应用、jar 工程 docker 方式部署

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 1. 把要部署的工程打成一个jar包。&#xff08;我的工程叫 gentle &#xff09; 打 jar 的方法&#xff1a;超简单方法&#xff1a; Int…

流浪不是我的初衷 ... ...

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 或许&#xff0c;我从来就是一个习惯沉默的人 ... 或许&#xff0c;我从来就不善于倾述 ... 会有难过的时候&#xff0c;会有觉得累的…

第二阶段冲刺(2)

1、整个项目预期的任务量 &#xff08;任务量 所有工作的预期时间&#xff09;和 目前已经花的时间 &#xff08;所有记录的 ‘已经花费的时间’&#xff09;&#xff0c;还剩余的时间&#xff08;所有工作的 ‘剩余时间’&#xff09; &#xff1b; 所有工作的预期时间&#…

VS2008+OpenCL环境配置

1. 配置.cl文件支持: 1.1. 打开VS2008&#xff0c; 工具->选项->文本编辑器->文件扩展名&#xff0c;添加一个新的扩展名&#xff0c;指定编辑器为Microsoft Visual C 。这样在OpenCL文件中就能显示C的语法高亮了。 1.2. 配置OpenCL语法高亮 - 打开目录~\NVIDIA Corpo…

第十二周学习进度报告

代码时间&#xff1a;17小时左右&#xff0c; 代码量&#xff1a;300行左右&#xff0c; 阅读&#xff1a;一个app的诞生20页&#xff1b;构建之法30页 知识&#xff1a;抽象典型用户&#xff08;具有代表性&#xff09;和场景&#xff0c;去设计相应功能。 转载于:https://www…

我的桃花源

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 看了一个动画片&#xff08;《猫与桃花源》&#xff09;&#xff0c;画风和内容并不是我最偏好的... 但故事结尾的旁白和歌曲却打动了一…

promise实例

不废话&#xff0c;粘代码 function ajax(method, url, data) {let request new XMLHttpRequest();return new Promise(function (resolve, reject) {request.onreadystatechange function () {if (request.readyState 4) {if (request.status 200) {resolve(request.respo…

华为路由器配置DHCP中继

DHCP(动态主机配置协议)理论知识&#xff1a;DHCP主要用来为客户机自动配置I P地址相关的网络参数&#xff0c;包括IP地址、子网掩码、默认网关、DNS服务器等。 DHCP 通信为广播的方式&#xff0c;因此当需要 DHCP 服务器为不同广播域&#xff08;路由或 VLAN 网段&#xff09;…

基于GPU的K-Means聚类算法

聚类是信息检索、数据挖掘中的一类重要技术&#xff0c;是分析数据并从中发现有用信息的一种有效手段。它将数据对象分组成为多个类或簇&#xff0c;使得在同一个簇中的对象之间具有较高的相似度&#xff0c;而不同簇中的对象差别很大。作为统计学的一个分支和一种无监督的学习…

IntelliJ IDEA 工具篇之如何切换 git 分支

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 1、进入项目和工程。 2、点击右下角的git:master&#xff0c;然后选择origin/master&#xff0c;然后选择你要切换的分支&#xff0c;我…

IDEA---SpringBoot同一个项目多端口启动

-Dserver.port xxxx 转载于:https://www.cnblogs.com/tonyzt/p/10987116.html

好程序员Web前端分享无法忽视的JavaScript技巧

好程序员Web前端分享无法忽视的JavaScript技巧。在大家从事web前端的工作中&#xff0c;很容易忽视一些JavaScript的小技巧&#xff0c;今天为大家总结了一些容易被大家忽略的技巧&#xff0c;希望能够对大家有所帮助。1、过滤唯一值Set类型是在ES6中新增的&#xff0c;它类似于…