RabbitMQ实例教程:发布/订阅者消息队列

消息交换机(Exchange)


  RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,一般的情况生产者甚至不知道消息应该发送到哪些队列。


wKiom1YZeRCgrXh9AAA8_sKBnCU136.jpg

  相反的,生产者只能发送消息给交换机(Exchange)。交换机的作用非常简单,一边接收从生产者发来的消息,另一边把消息推送到队列中。交换机必须清楚的知道消息如何处理它收到的每一条消息。是否应该追加到一个指定的队列?是否应该追加到多个队列?或者是否应该丢弃?这些规则通过交换机的类型进行定义。


  交换机的类型有:direct,topic,headers 和 fanout。我们以fanout为例创建一个“logs”类型的交换机。


1
channel.exchangeDeclare("logs""fanout");


  fanout交换机非常简单,它会广播它收到的所有队列的所有消息。


  交换机命名


  在前面的例子中,我们不了解交换机的任何概念,也能发送消息,这是因为我们使用了默认的交换机(""),但以后可以使用我们自定义的交换机了。


1
2
channel.basicPublish("""hello", null, message.getBytes()); //空字符串交换机
channel.basicPublish( "logs""", null, message.getBytes()); //logs交换机


  临时队列(Temporary Queues)


  在前面的例子中,我们为队列都指定了具体的名字(如hello和task_queue),给队列命名是非常重要的事情,因为生产者和消费者是队列名称来传递消息的。


  但是对于日志来说的消息队列,我们会监听所有的日志消息,而不是其中的一些子集。而且我们只关注当前发生的消息而不是历史消息,要解决这些问题需要这么做:


  首先,当我们连接Rabbit服务器时,我们需要一个新的空队列。我们可以自己随机生成一个队列名字或者让服务器随机生成一个队列名字。


  其次,当消息消费者失去连接时,队列应该自动删除。


  在Java中,我们使用不带参数的queueDeclare()方法创建一个非持久化的,唯一的,用后自动删除的队列。


1
String queueName = channel.queueDeclare().getQueue();


  queueName可能是像 amq.gen-JzTY20BRgKO-HjmUJj0wLg 这样的随机队列名。


  消息绑定(Bindings)


  前面我们创建了一个fanout类型的交换机和队列。现在需要告诉交换机发送消息到队列。交换机和队列之间的关系就是消息绑定(binding)。


wKioL1YZemfT9Am4AAA7uKydZt4912.jpg

  使用下面的代码logs交换机会将消息传递给队列。


1
channel.queueBind(queueName, "logs""");


  将交换机和消息绑定放在一起


wKiom1YZetLSZ2z_AABbZcUcUF0159.jpg


  现在我们有一个提交日志的的消息生产者,它与我们之前的消息发送者并没有太大的区别,唯一不同的地方是我们将消息发送到 logs 交换机,而不是没有名字的交换机。当发送消息时,我们需要提供一个路由,尽管它在 fanout 交换机中并没有什么作用。下面是提交日志的Java代码。


  EmitLog.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.favccxx.favrabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLog {
 private static final String EXCHANGE_NAME = "logs";
 public static void main(String[] argv) throws Exception {
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  String[] sendMsgs = {"I""saw""a""dog"};
  String message = getMessage(sendMsgs);
  channel.basicPublish(EXCHANGE_NAME, ""null, message.getBytes("UTF-8"));
  System.out.println(" [x] Sent '" + message + "'");
  channel.close();
  connection.close();
 }
 private static String getMessage(String[] strings) {
  if (strings.length < 1)
   return "info: Hello World!";
  return joinStrings(strings, " ");
 }
 private static String joinStrings(String[] strings, String delimiter) {
  int length = strings.length;
  if (length == 0)
   return "";
  StringBuilder words = new StringBuilder(strings[0]);
  for (int i = 1; i < length; i++) {
   words.append(delimiter).append(strings[i]);
  }
  return words.toString();
 }
}



  正如上面所示,与消息服务器建立连接后,声明了一个交换机,这是因为系统不允许发布到空交换机。 如果没有队列绑定到交换机的话,消息就会丢失,但我们不用担心。如果没有消费者监听消息的话,我们就丢弃该消息。


  接收消息代码ReceiveLogs.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.favccxx.favrabbit;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class ReceiveLogs {
 private static final String EXCHANGE_NAME = "logs";
 public static void main(String[] argv) throws Exception {
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  String queueName = channel.queueDeclare().getQueue();
  channel.queueBind(queueName, EXCHANGE_NAME, "");
  System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  Consumer consumer = new DefaultConsumer(channel) {
   @Override
   public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
     byte[] body) throws IOException {
    String message = new String(body, "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
   }
  };
  channel.basicConsume(queueName, true, consumer);
 }
}



  测试数据


  运行几个日志消息接收者实例,使用日志消息发送者发送消息,发现每个日志消息接收者都接收到同样的数据,说明发布订阅成功。


1
 [x] Received 'I saw a dog'




本文转自 genuinecx 51CTO博客,原文链接:http://blog.51cto.com/favccxx/1701738,如需转载请自行联系原作者

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/453907.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OAuth 2.0(网转)

&#xff08;一&#xff09;背景知识 OAuth 2.0很可能是下一代的“用户验证和授权”标准&#xff0c;目前在国内还没有很靠谱的技术资料。为了弘扬“开放精神”&#xff0c;让业内的人更容易理解“开放平台”相关技术&#xff0c;进而长远地促进国内开放平台领域的发展&#xf…

kafka 自动提交 和 手动提交

Consumer 需要向 Kafka 汇报自己的位移数据&#xff0c;这个汇报过程被称为提交位移&#xff08;Committing Offsets&#xff09;。因为 Consumer 能够同时消费多个分区的数据&#xff0c;所以位移的提交实际上是在分区粒度上进行的&#xff0c;即 Consumer 需要为分配给它的每…

axios vue 回调函数_vue中ajax请求与axios包完美处理

这次给大家带来vue中ajax请求与axios包完美处理&#xff0c;vue中ajax请求与axios包处理的注意事项有哪些&#xff0c;下面就是实战案例&#xff0c;一起来看一下。在vue中&#xff0c;经常会用到数据请求&#xff0c;常用的有&#xff1a;vue-resourse、axios今天我说的是axio…

用int还是用Integer?

昨天例行code review时大家有讨论到int和Integer的比较和使用。 这里做个整理&#xff0c;发表一下个人的看法。【int和Integer的区别】int是java提供的8种原始类型之一&#xff0c;java为每个原始类型提供了封装类&#xff0c;Integer是int的封装类。int默认值是0&#xff0c;…

前端之 JavaScript 常用数据类型和操作

JavaScript 常用数据类型有&#xff1a;数字、字符串、布尔、Null、Undefined、对象 JavaScript 拥有动态类型 JavaScript 拥有动态类型。这意味着相同的变量可用作不同的类型 var x; // 此时x是undefined var x 1; // 此时x是数字 var x "Alex" …

mysql备份还原(视图、存储过程)

最近在备份还原mysql的时候发现&#xff0c;视图还原报错&#xff0c;无法创建视图&#xff0c;在网上查了下资料&#xff0c;找到以下信息&#xff1a;1、如果备份的数据库含有视图,还原时需要把my.ini中的character-set改为latin1,才能够还原视图。2、还原后,需要把latin1改为…

有关javabean的说法不正确的是_关于 JavaBean, 下列叙述中不正确的是 ( ) 。_学小易找答案...

【填空题】在使用 URL 传值时传输的数据只能是 类型。【简答题】陶器是人类最伟大的发明,比四大发明更有意义,你如何认为?(手机上直接回答提交)【单选题】对于 ( ) 作用范围的 Bean, 当客户离开这个页面时 JSP 引擎取消为客户的该页 面分配的 Bean, 释放他所占的内存空间。【填…

Postgres中tuple的组装与插入

1.相关的数据类型 我们先看相关的数据类型&#xff1a; HeapTupleData(src/include/access/htup.h) typedef struct HeapTupleData {uint32 t_len; /* length of *t_data */ItemPointerData t_self; /* SelfItemPointer */Oid t_tableOid; /* ta…

Python 自动生成环境依赖包 requirements

一、生成当前 python 环境 安装的所有依赖包 1、命令 # cd 到项目路径下&#xff0c;执行以下命令 pip freeze > requirements.txt# 或者使用如下命令 pip list --formatfreeze > requirements.txt 2、常见问题 1、中使用 pip freeze > requirements.txt 命令导出…

DenyHosts 加固centos系统安全

DenyHosts是Python语言写的一个程序&#xff0c;它会分析sshd的日志文件&#xff08;/var/log/secure&#xff09;&#xff0c;当发现重 复的攻击时就会记录IP到/etc/hosts.deny文件&#xff0c;从而达到自动屏IP的功能 DenyHosts官方网站 http://denyhosts.sourceforge.net 下…

在windows xp下编译出ffmpeg.exe

找了好多资料&#xff0c;把自己的编译成功过程详细叙述&#xff0c;以避免后来者可以少浪费点时间。 1.安装MSys 到http://sourceforge.net/project/showfiles.php?group_id2435下载文件&#xff1a;   bash-3.1-MSYS-1.0.11-tar.bz2   msysCORE-1.0.11-2007.01.19-1.ta…

手机uc怎么放大页面_手机网站怎样做可以提高用户体验度?——竹晨网络

目前&#xff0c;手机已经占据了人们大多数的闲暇时间&#xff0c;互联网的流量开始逐渐向移动端倾斜&#xff0c;重视移动端的用户体验&#xff0c;就可以给客户端增加很多意想不到的功能。但是还是有很多公司和站长不知道手机网站应该怎么建才能符合用户的使用习惯。下面&…

科技申报项目总结

这个项目分为三大模块&#xff0c;管理员&#xff0c;专家以及单位模块&#xff0c;具体页面有&#xff1a;1单位信息&#xff1b;2项目申报&#xff1b;3专家信息&#xff1b;4项目评审&#xff1b;5 项目信息&#xff1b;6申报设置&#xff1b;7专家信息。 —-项目框架SSM&am…

kafka 异常:ERROR Failed to clean up log for __consumer_offsets-30 in dir /tmp/kafka-logs due to IOExce

问题概述 kafka进程不定期挂掉。ERROR Failed to clean up log for __consumer_offsets-30 in dir /tmp/kafka-logs due to IOException (kafka.server.LogDirFailureChannel)&#xff0c;报错如下 [2020-12-07 16:12:36,803] ERROR Failed to clean up log for __consumer_o…

树形控件(CTreeCtrl和CTreeView)

如何插入数据项目&#xff1f;如何添加鼠标右击事件&#xff1f;插入数据项 通过InsertItem()方法&#xff0c;有四种重载样式: HTREEITEM InsertItem(LPTVINSERTSTRUCT lpInsertStruct); HTREEITEM InsertItem(UINT nMask, LPCTSTR lpszItem, int nImage,int nSelectedImage, …

ffmpeg编译(生成Windows或Win32平台dll, lib)

ffmpeg编译(生成Windows或Win32平台dll, lib) 介绍&#xff1a;本文简要介绍通过cygwin环境来编译生成ffmpeg。 包括解码组件libfaad与libopencore-amrnb的编译。 1)安装msys mingw环境 具体安装过程可以看网上教程 我用的是&#xff1a;http://code.google.com/p/msys-cn/ 假…

2019python课件_2019版经典Python学习路线分享

Python有三大神器&#xff0c;包括numpy,scipy,matplotlib,因此适合用于数据处理。spark&#xff0c;Hadoop都开了Python的接口&#xff0c;所以使用Python做Python的mapreduce也非常简单。因此它也备受欢迎&#xff0c;python学习大纲分享给大家。一、Python基础1.2数据的存储…

UML之涉众/参与者(角色/执行者)(Actor)/业务主角(BusinessActor)/业务工人(BusinessWorker)/用户/角色辨析【图解】...

参考文档&#xff1a; 【业务建模】(http://www.baike.com/wiki/%E4%B8%9A%E5%8A%A1%E5%BB%BA%E6%A8%A1) 【UML 核心元素之参与者】(http://www.voidcn.com/article/p-obarwwaq-tp.html) 【UML核心元素之参与者】(http://www.voidcn.com/article/p-ntpnhoue-da.html)转载于:htt…

git 报错:Please make sure you have the correct access rights and the repository exists

提示&#xff1a;Warning: Permanently added gitee.com,120.55.226.24 (ECDSA) to the list of known hosts.是公钥出问题了&#xff0c;要先设置用户和邮箱再重新生成ssh公钥即可。 1、首先我得重新在git设置一下身份的名字和邮箱 进入到需要提交的文件夹底下&#xff08;…

java 实现excel 导出功能

实现功能&#xff1a;java导出excel表 1、jsp代码 1 <form id"zhanwForm" action"<%path%>/conferences.do?" target"_self" method"get" > 2 <input type"hidden" name"method" value…