spring集成RabbitMQ配置文件详解(生产者和消费者)

1,首先引入配置文件org.springframework.amqp,如下:

<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>1.7.1.RELEASE</version></dependency>

2,准备工作:安装好rabbitmq,并在项目中增加配置文件   rabbit.properties 内容如下:

rmq.ip=192.188.113.114   
rmq.port=5672
rmq.producer.num=20
rmq.manager.user=admin
rmq.manager.password=admin

3,rabbitmq属性介绍:

概念解释:

Brocker:消息队列服务器实体。Exchange:消息交换机,指定消息按什么规则,路由到哪个队列。Queue:消息队列,每个消息都会被投入到一个或者多个队列里。Binding:绑定,它的作用是把exchange和queue按照路由规则binding起来。Routing Key:路由关键字,exchange根据这个关键字进行消息投递。Virtual Host: 虚拟主机,一个broker里可以开设多个vhost,用作不用用户的权限分离。每个virtual host本质上都是一个RabbitMQ Server(但是一个server中可以有多个virtual host),拥有它自己若干的个Exchange、Queue和bings rule等等。其实这是一个虚拟概念,类似于权限控制组。Virtual Host是权限控制的最小粒度。Producer:消息生产者,就是投递消息的程序。Consumer:消息消费者,就是接受消息的程序。Connection: 就是一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。接下来的实践案例中我们就可以看到,producer和consumer与exchange的通信的前提是先建立TCP连接。仅仅创建了TCP连接,producer和consumer与exchange还是不能通信的。我们还需要为每一个Connection创建Channel。Channel: 它是建立在上述TCP连接之上的虚拟连接。数据传输都是在Channel中进行的。AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。有人要问了,为什么要使用Channel呢,直接用TCP连接不就好了么?对于一个消息服务器来说,它的任务是处理海量的消息,当有很多线程需要从RabbitMQ中消费消息或者生产消息,那么必须建立很多个connection,也就是许多个TCP连接。然而对于操作系统而言,建立和关闭TCP连接是非常昂贵的开销,而且TCP的连接数也有限制,频繁的建立关闭TCP连接对于系统的性能有很大的影响,如果遇到高峰,性能瓶颈也随之显现。RabbitMQ采用类似NIO的做法,选择TCP连接服用,不仅可以减少性能开销,同时也便于管理。在TCP连接中建立Channel是没有上述代价的,可以复用TCP连接。对于Producer或者Consumer来说,可以并发的使用多个Channel进行Publish或者Receive。有实验表明,在Channel中,1秒可以Publish10K的数据包。对于普通的Consumer或者Producer来说,这已经足够了。除非有非常大的流量时,一个connection可能会产生性能瓶颈,此时就需要开辟多个connection。

消息队列的使用过程大概如下:
消息接收

客户端连接到消息队列服务器,打开一个channel。
客户端声明一个exchange,并设置相关属性。
客户端声明一个queue,并设置相关属性。
客户端使用routing key,在exchange和queue之间建立好绑定关系。

消息发布

客户端投递消息到exchange。
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

AMQP 里主要要说两个组件:

Exchange 和 Queue
绿色的X就是Exchange ,红色的是Queue,这两者都在Server端,又称作Broker
这部分是RabbitMQ实现的,而蓝色的则是客户端,通常有Producer和Consumer两种类型。

 

4,配置spring-rabbitmq.xml,内容如下:

<!-- 公共部分 -->
<!-- 创建连接类 连接安装好的 rabbitmq -->
<bean id="connectionFactory"  class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"><constructor-arg value="localhost" />    <!-- username,访问RabbitMQ服务器的账户,默认是guest --><property name="username" value="${rmq.manager.user}" /><!-- username,访问RabbitMQ服务器的密码,默认是guest -->   <property name="password" value="${rmq.manager.password}" /><!-- host,RabbitMQ服务器地址,默认值"localhost" -->   <property name="host" value="${rmq.ip}" />   <!-- port,RabbitMQ服务端口,默认值为5672 --><property name="port" value="${rmq.port}" /><!-- channel-cache-size,channel的缓存数量,默认值为25 --><property name="channel-cache-size" value="50" /><!-- cache-mode,缓存连接模式,默认值为CHANNEL(单个connection连接,连接之后关闭,自动销毁) --><property name="cache-mode" value="CHANNEL" />   
</bean>
<!--或者这样配置,connection-factory元素实际就是注册一个org.springframework.amqp.rabbit.connection.CachingConnectionFactory实例
<rabbit:connection-factory id="connectionFactory" host="${rmq.ip}" port="${rmq.port}"
username="${rmq.manager.user}" password="${rmq.manager.password}" />-->
<rabbit:admin connection-factory="connectionFactory"/><!--定义消息队列,durable:是否持久化,如果想在RabbitMQ退出或崩溃的时候,不会失去所有的queue和消息,需要同时标志队列(queue)和交换机(exchange)是持久化的,即rabbit:queue标签和rabbit:direct-exchange中的durable=true,而消息(message)默认是持久化的可以看类org.springframework.amqp.core.MessageProperties中的属性public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;exclusive: 仅创建者可以使用的私有队列,断开后自动删除;auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 -->
<rabbit:queue name="spittle.alert.queue.1" id="queue_1" durable="true" auto-delete="false" exclusive="false" />
<rabbit:queue name="spittle.alert.queue.2" id="queue_2" durable="true" auto-delete="false" exclusive="false" />
<rabbit:queue name="spittle.alert.queue.3" id="queue_3" durable="true" auto-delete="false" exclusive="false" /><!--绑定队列,rabbitmq的exchangeType常用的三种模式:direct,fanout,topic三种,我们用direct模式,即rabbit:direct-exchange标签,Direct交换器很简单,如果是Direct类型,就会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。有一个需要注意的地方:如果找不到指定的exchange,就会报错。但routing key找不到的话,不会报错,这条消息会直接丢失,所以此处要小心,auto-delete:自动删除,如果为Yes,则该交换机所有队列queue删除后,自动删除交换机,默认为false -->
<rabbit:direct-exchange id="spittle.fanout" name="spittle.fanout" durable="true" auto-delete="false"><rabbit:bindings><rabbit:binding queue="spittle.alert.queue.1" key="{alert.queue.1}"></rabbit:binding><rabbit:binding queue="spittle.alert.queue.2" key="{alert.queue.2}"></rabbit:binding><rabbit:binding queue="spittle.alert.queue.3" key="{alert.queue.3}"></rabbit:binding></rabbit:bindings>
</rabbit:fanout-exchange><!-- 生产者部分 -->
<!-- 发送消息的producer类,也就是生产者 -->
<bean id="msgProducer" class="com.asdf.sdf.ClassA"><!-- value中的值就是producer中的的routingKey,也就是队列名称,它与上面的rabbit:bindings标签中的key必须相同 --><property name="queueName" value="{alert.queue.1}"/>
</bean><!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
<bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean>
<!-- 或者配置jackson -->
<!--
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
--><rabbit:template exchange="test-exchange" id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" /><!-- 消费者部分 -->
<!-- 自定义接口类 -->
<bean id="testHandler" class="com.rabbit.TestHandler"></bean><!-- 用于消息的监听的代理类MessageListenerAdapter -->
<bean id="testQueueListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter" ><!-- 类名 --><constructor-arg ref="testHandler" /><!-- 方法名 --><property name="defaultListenerMethod" value="handlerTest"></property><property name="messageConverter" ref="jsonMessageConverter"></property>
</bean><!-- 配置监听acknowledeg="manual"设置手动应答,它能够保证即使在一个worker处理消息的时候用CTRL+C来杀掉这个worker,或者一个consumer挂了(channel关闭了、connection关闭了或者TCP连接断了),也不会丢失消息。因为RabbitMQ知道没发送ack确认消息导致这个消息没有被完全处理,将会对这条消息做re-queue处理。如果此时有另一个consumer连接,消息会被重新发送至另一个consumer会一直重发,直到消息处理成功,监听容器acknowledge="auto" concurrency="30"设置发送次数,最多发送30次 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" concurrency="20"><rabbit:listener queues="spittle.alert.queue.1" ref="testQueueListenerAdapter" /><rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" /><rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" />
</rabbit:listener-container>

 

5,生产者(发送端)代码:

@Resource  
private RabbitTemplate rabbitTemplate;
private String queueName;  
public void sendMessage(CommonMessage msg){try {  logger.error("发送信息开始");System.out.println(rabbitTemplate.getConnectionFactory().getHost());  //发送信息  queueName交换机,就是上面的routingKey msg.getSource() 为 test_key 
             rabbitTemplate.convertAndSend(queueName,msg.getSource(), msg);//如果是普通字符串消息需要先序列化,再发送消息//rabbitTemplate.convertAndSend(queueName,msg.getSource(), SerializationUtils.serialize(msg));logger.error("发送信息结束");} catch (Exception e) {  e.printStackTrace();}}public void setQueueName(String queueName) {this.queueName = queueName;
}

 

6,消费端代码:TestHandler 类

public class TestHandler  {@Overridepublic void handlerTest(CommonMessage commonMessage) {System.out.println("DetailQueueConsumer: " + new String(message.getBody()));}
}

其他exchangeType介绍:

fanOut:

<!-- Fanout 扇出,顾名思义,就是像风扇吹面粉一样,吹得到处都是。如果使用fanout类型的exchange,那么routing key就不重要了。因为凡是绑定到这个exchange的queue,都会受到消息。 -->
<rabbit:fanout-exchange name="delayed_message_exchange" durable="true" auto-delete="false" id="delayed_message_exchange">  <rabbit:bindings>  <rabbit:binding queue="test_delay_queue"/>  </rabbit:bindings>  
</rabbit:fanout-exchange>

topic:如果说direct是将消息放到exchange绑定的一个queue里(一对一);fanout是将消息放到exchange绑定的所有queue里(一对所有);那么topic类型的exchange就可以实现(一对部分),应用场景就是打印不同级别的错误日志,我们的系统出错后会根据不同的错误级别生成error_levelX.log日志,我们在后台首先要把所有的error保存在一个总的queue(绑定了一个*.error的路由键)里,然后再按level分别存放在不同的queue。

<!-- 发送端不是按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此 -->
<rabbit:topic-exchange name="message-exchange" durable="true" auto-delete="false" id="message-exchange"><rabbit:bindings><rabbit:binding queue="Q1" pattern="error.*.log" /><rabbit:binding queue="Q2" pattern="error.level1.log" /><rabbit:binding queue="Q3" pattern="error.level2.log" /></rabbit:bindings>
</rabbit:topic-exchange>

routing key绑定如下图:

 

本文转自:

https://blog.csdn.net/nandao158/article/details/81065892

https://www.cnblogs.com/LipeiNet/p/6079427.html

https://www.toutiao.com/a6598154241037042189/?tt_from=mobile_qq&utm_campaign=client_share&timestamp=1536277584&app=news_article&utm_source=mobile_qq&iid=43157585039&utm_medium=toutiao_android&group_id=6598154241037042189

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/277331.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Multiverse: Revolutionary Backend for Alembic // Multiverse: 下一代Alembic后端

J CUBE&#xff0c;日本最大的动画公司Polygon Picture&#xff08;以下简称PPI&#xff09;公司成立的专职R&D公司隆重推出Multiverse&#xff0c;下一代Alembic存储后端。 我们还开发了针对Autodesk Maya的工具&#xff0c;运用Multiverse在流程中。 "multiverse&qu…

近半年能力没进步原因分析与求助

2019独角兽企业重金招聘Python工程师标准>>> 20180907 思维方式有缺陷&#xff0c;想到的解决方法经常不是最有效率的。导致工作时间内基本没自由学习的时间。 业余时间不够专注&#xff0c;学习方向经常变&#xff0c;没能坚持搞透一个点就换书看&#xff0c;没有总…

windows下安装Redis并部署成服务

文章来源&#xff1a;https://www.cnblogs.com/weiqinl/p/6490372.html windows下安装Redis并部署成服务 Redis 是一个开源&#xff08;BSD许可&#xff09;的&#xff0c;内存中的数据结构存储系统&#xff0c;它可以用作数据库、缓存和消息中间件。 一&#xff1a;下载 下载地…

.net core高性能通讯开源组件BeetleX

BeetleX beetleX是基于dotnet core实现的轻量级高性能的TCP通讯组件&#xff0c;使用方便、性能高效和安全可靠是组件设计的出发点&#xff01;开发人员可以在Beetlx组件的支持下快带地构建高性能的TCP通讯服务程序&#xff0c;在安全通讯方面只需要简单地设置一下SSL信息即可实…

按组排名

rank() over,dense_rank() over,row_number() over的区别 1.rank() over&#xff1a;查出指定条件后的进行排名。特点是&#xff0c;加入是对学生排名&#xff0c;使用这个函数&#xff0c;成绩相同的两名是并列&#xff0c;下一位同学空出所占的名次。 select name,subject,sc…

[FxCop.设计规则]13. 定义自定义属性参数的访问属性

13. 定义自定义属性参数的访问属性 翻译概述&#xff1a; 一个比较无聊的规则&#xff0c;实在看不出在什么情况下&#xff0c;一个开发者会做出违反这条规则的设计。没有别的内容&#xff0c;只是说应该为自定义特性的构造函数中的参数提供一个相关的属性去读取它们的值。…

centos7安装Cloudera Manager

第一部分&#xff1a;准备工作一&#xff0c;修改hostname $vim /etc/sysconfig/network $source /etc/sysconfig/network例如&#xff1a; NETWORKINGyes HOSTNAMEspark01reboot重启服务器 二&#xff0c;关闭selinux查看SELinux状态1&#xff0c;/usr/sbin/sestatus -v #如果…

Grove——.NET中的ORM实现

Grove——.NET中的ORM实现 发布日期&#xff1a; 6/30/2005| 更新日期&#xff1a; 6/30/2005作者&#xff1a;林学鹏 ORM的全称是Object Relational Mapping&#xff0c;即对象关系映射。它的实质就是将关系数据&#xff08;库&#xff09;中的业务数据用对象的形式表示出来&a…

[book]道法自然

前不久读了王咏刚的〈凌波微步〉和〈凌波微步II〉&#xff0c;感觉不错。今天把他老人家的《道法自然》也买了下来。在dearbook看到关于这本书的长篇大评&#xff0c;也一块copy了下来&#xff1a;http://www.dearbook.com.cn/book/viewbook.aspx?pnoTS0023954认真的作者&…

列表嵌套字典,根据字典某一key排序

在返回列表嵌套字典时候&#xff0c;往往需要对数据进行一定的处理&#xff1a;按照字典中某一个key排序 In [87]: a [{"name": "牛郎", "age": 23},{"name":"许仙", "age": 20},{"name":"董永&q…

写出C语言中5种数据类型的名称及其关键字,求C语言中的32个关键字及其意思?...

关键字如下&#xff1a;一、数据类型关键字(12个)&#xff1a;(1) char &#xff1a;声明字符型变量或函数(2) double &#xff1a;声明双精度变量或函数(3) enum &#xff1a;声明枚举类型(4) float&#xff1a;声明浮点型变量或函数(5) int&#xff1a; 声明整型变量或函数(6…

想要设计自己的微服务?看这篇文章就对了

欢迎大家前往腾讯云社区&#xff0c;获取更多腾讯海量技术实践干货哦~ 本文由我就静静地看 发表于云社区专栏 本文通过使用Spring Boot&#xff0c;Spring Cloud和Docker构建的概念验证应用程序的示例&#xff0c;为了解常见的微服务架构模式提供了一个起点。 该代码在Github上…

mysql 开发进阶篇系列 41 mysql日志之慢查询日志

一.概述 慢查询日志记录了所有的超过sql语句( 超时参数long_query_time单位 秒&#xff09;&#xff0c;获得表锁定的时间不算作执行时间。慢日志默认写入到参数datadir(数据目录)指定的路径下。默认文件名是[hostname]_slow.log&#xff0c;默认超时是10秒&#xff0c;默认不开…

分数相同名次排名规则C语言,如何给数据排名(相同分数相同名次)-excel篇

使用Rank函数来做数据排名该函数是返回一个数值在一个数字列表中的排名。语法&#xff1a;RANK(number,ref,order)RANK(对象,范围,参数)number(必填参数):是特定单位格中的数据&#xff0c;需要在整个数字列表中排名的单个对象。ref(必填参数):是指需要排名的整体数列。即范围&…

MySql的连接查询

若一个查询同时涉及到两个或者两个以上的表&#xff0c;则称之为连接查询。常见的包括&#xff1a;等值连接查询&#xff0c;自然连接查询&#xff0c;非等值连接查询&#xff0c;自身连接查询&#xff0c;外连接查询&#xff08;左右连接&#xff09;。 1.等值与非等值连接查询…

qt运行C语言后无显示,qt designer启动后不显示界面问题的原因与解决办法-站长资讯中心...

Qt 5.6.1无论是在vs里双击ui文件还是直接启动designer.exe都一直无法显示界面&#xff0c;但任务管理器中可以看到该进程是存在的。前几天还正常的&#xff0c;但昨天加了一块NVIDIA的显卡(机器自带核显)&#xff0c;可能与此有关。幸好还可以通过QtCreator打开ui文件进行编辑。…

OpenSolaris北京用户组的第一次活动

OpenSolaris北京用户组的第一次活动作者: BadcoffeeEmail: blog.olivergmail.comBlog: http://blog.csdn.net/yayong2005年10月10月15号&#xff0c;OpenSolaris北京用户组在北京西郊宾馆会议厅组织了成立以来的第一次活动。尽管OpenSolaris早在2005年6月14日就正式开放源代码&…

Android PermissionUtils:运行时权限工具类及申请权限的正确姿势

Android PermissionUtils&#xff1a;运行时权限工具类及申请权限的正确姿势 ifadai 关注 2017.06.16 16:22* 字数 318 阅读 3637评论 1喜欢 6PermissionUtil 经常写Android运行时权限申请代码&#xff0c;每次都是复制过来之后&#xff0c;改一下权限字符串就用&#xff0c;把…

Linux基础监控小工具nmon

nmon是一种在AIX与各种Linux操作系统上广泛使用的监控与分析工具&#xff0c; nmon所记录的信息是比较全面的&#xff0c;它能在系统运行过程中实时地捕捉系统资源的使用情况&#xff0c;并且能输出结果到文件中。nmon工具可以帮助在一个屏幕上显示所有重要的性能优化信息&…

vue的配置环境篇

1.电脑已经安装的nodejs和webpack。 2.1&#xff09;打开cmd。winr。可以直接输入node -v查看版本。安装淘宝镜像 npm install -g cnpm --registryhttp://registry.npm.taobao.org &#xff0c;安装成功可以查看下&#xff0c;cnpm -v 3.安装vue脚手架&#xff0c;输入命令&am…