RabbitMQ实例教程:发布/订阅者消息队列

消息交换机(Exchange)


  RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,一般的情况生产者甚至不知道消息应该发送到哪些队列。


wKiom1YZeRCgrXh9AAA8_sKBnCU136.jpg

  相反的,生产者只能发送消息给交换机(Exchange)。交换机的作用非常简单,一边接收从生产者发来的消息,另一边把消息推送到队列中。交换机必须清楚的知道消息如何处理它收到的每一条消息。是否应该追加到一个指定的队列?是否应该追加到多个队列?或者是否应该丢弃?这些规则通过交换机的类型进行定义。


  交换机的类型有:direct,topic,headers 和 fanout。我们以fanout为例创建一个“logs”类型的交换机。


1
channel.exchangeDeclare("logs""fanout");


  fanout交换机非常简单,它会广播它收到的所有队列的所有消息。


  交换机命名


  在前面的例子中,我们不了解交换机的任何概念,也能发送消息,这是因为我们使用了默认的交换机(""),但以后可以使用我们自定义的交换机了。


1
2
channel.basicPublish("""hello", null, message.getBytes()); //空字符串交换机
channel.basicPublish( "logs""", null, message.getBytes()); //logs交换机


  临时队列(Temporary Queues)


  在前面的例子中,我们为队列都指定了具体的名字(如hello和task_queue),给队列命名是非常重要的事情,因为生产者和消费者是队列名称来传递消息的。


  但是对于日志来说的消息队列,我们会监听所有的日志消息,而不是其中的一些子集。而且我们只关注当前发生的消息而不是历史消息,要解决这些问题需要这么做:


  首先,当我们连接Rabbit服务器时,我们需要一个新的空队列。我们可以自己随机生成一个队列名字或者让服务器随机生成一个队列名字。


  其次,当消息消费者失去连接时,队列应该自动删除。


  在Java中,我们使用不带参数的queueDeclare()方法创建一个非持久化的,唯一的,用后自动删除的队列。


1
String queueName = channel.queueDeclare().getQueue();


  queueName可能是像 amq.gen-JzTY20BRgKO-HjmUJj0wLg 这样的随机队列名。


  消息绑定(Bindings)


  前面我们创建了一个fanout类型的交换机和队列。现在需要告诉交换机发送消息到队列。交换机和队列之间的关系就是消息绑定(binding)。


wKioL1YZemfT9Am4AAA7uKydZt4912.jpg

  使用下面的代码logs交换机会将消息传递给队列。


1
channel.queueBind(queueName, "logs""");


  将交换机和消息绑定放在一起


wKiom1YZetLSZ2z_AABbZcUcUF0159.jpg


  现在我们有一个提交日志的的消息生产者,它与我们之前的消息发送者并没有太大的区别,唯一不同的地方是我们将消息发送到 logs 交换机,而不是没有名字的交换机。当发送消息时,我们需要提供一个路由,尽管它在 fanout 交换机中并没有什么作用。下面是提交日志的Java代码。


  EmitLog.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.favccxx.favrabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLog {
 private static final String EXCHANGE_NAME = "logs";
 public static void main(String[] argv) throws Exception {
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  String[] sendMsgs = {"I""saw""a""dog"};
  String message = getMessage(sendMsgs);
  channel.basicPublish(EXCHANGE_NAME, ""null, message.getBytes("UTF-8"));
  System.out.println(" [x] Sent '" + message + "'");
  channel.close();
  connection.close();
 }
 private static String getMessage(String[] strings) {
  if (strings.length < 1)
   return "info: Hello World!";
  return joinStrings(strings, " ");
 }
 private static String joinStrings(String[] strings, String delimiter) {
  int length = strings.length;
  if (length == 0)
   return "";
  StringBuilder words = new StringBuilder(strings[0]);
  for (int i = 1; i < length; i++) {
   words.append(delimiter).append(strings[i]);
  }
  return words.toString();
 }
}



  正如上面所示,与消息服务器建立连接后,声明了一个交换机,这是因为系统不允许发布到空交换机。 如果没有队列绑定到交换机的话,消息就会丢失,但我们不用担心。如果没有消费者监听消息的话,我们就丢弃该消息。


  接收消息代码ReceiveLogs.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.favccxx.favrabbit;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class ReceiveLogs {
 private static final String EXCHANGE_NAME = "logs";
 public static void main(String[] argv) throws Exception {
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  String queueName = channel.queueDeclare().getQueue();
  channel.queueBind(queueName, EXCHANGE_NAME, "");
  System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  Consumer consumer = new DefaultConsumer(channel) {
   @Override
   public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
     byte[] body) throws IOException {
    String message = new String(body, "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
   }
  };
  channel.basicConsume(queueName, true, consumer);
 }
}



  测试数据


  运行几个日志消息接收者实例,使用日志消息发送者发送消息,发现每个日志消息接收者都接收到同样的数据,说明发布订阅成功。


1
 [x] Received 'I saw a dog'




本文转自 genuinecx 51CTO博客,原文链接:http://blog.51cto.com/favccxx/1701738,如需转载请自行联系原作者

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/453907.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OAuth 2.0(网转)

&#xff08;一&#xff09;背景知识 OAuth 2.0很可能是下一代的“用户验证和授权”标准&#xff0c;目前在国内还没有很靠谱的技术资料。为了弘扬“开放精神”&#xff0c;让业内的人更容易理解“开放平台”相关技术&#xff0c;进而长远地促进国内开放平台领域的发展&#xf…

kafka 自动提交 和 手动提交

Consumer 需要向 Kafka 汇报自己的位移数据&#xff0c;这个汇报过程被称为提交位移&#xff08;Committing Offsets&#xff09;。因为 Consumer 能够同时消费多个分区的数据&#xff0c;所以位移的提交实际上是在分区粒度上进行的&#xff0c;即 Consumer 需要为分配给它的每…

前端之 JavaScript 常用数据类型和操作

JavaScript 常用数据类型有&#xff1a;数字、字符串、布尔、Null、Undefined、对象 JavaScript 拥有动态类型 JavaScript 拥有动态类型。这意味着相同的变量可用作不同的类型 var x; // 此时x是undefined var x 1; // 此时x是数字 var x "Alex" …

Postgres中tuple的组装与插入

1.相关的数据类型 我们先看相关的数据类型&#xff1a; HeapTupleData(src/include/access/htup.h) typedef struct HeapTupleData {uint32 t_len; /* length of *t_data */ItemPointerData t_self; /* SelfItemPointer */Oid t_tableOid; /* ta…

Python 自动生成环境依赖包 requirements

一、生成当前 python 环境 安装的所有依赖包 1、命令 # cd 到项目路径下&#xff0c;执行以下命令 pip freeze > requirements.txt# 或者使用如下命令 pip list --formatfreeze > requirements.txt 2、常见问题 1、中使用 pip freeze > requirements.txt 命令导出…

DenyHosts 加固centos系统安全

DenyHosts是Python语言写的一个程序&#xff0c;它会分析sshd的日志文件&#xff08;/var/log/secure&#xff09;&#xff0c;当发现重 复的攻击时就会记录IP到/etc/hosts.deny文件&#xff0c;从而达到自动屏IP的功能 DenyHosts官方网站 http://denyhosts.sourceforge.net 下…

手机uc怎么放大页面_手机网站怎样做可以提高用户体验度?——竹晨网络

目前&#xff0c;手机已经占据了人们大多数的闲暇时间&#xff0c;互联网的流量开始逐渐向移动端倾斜&#xff0c;重视移动端的用户体验&#xff0c;就可以给客户端增加很多意想不到的功能。但是还是有很多公司和站长不知道手机网站应该怎么建才能符合用户的使用习惯。下面&…

科技申报项目总结

这个项目分为三大模块&#xff0c;管理员&#xff0c;专家以及单位模块&#xff0c;具体页面有&#xff1a;1单位信息&#xff1b;2项目申报&#xff1b;3专家信息&#xff1b;4项目评审&#xff1b;5 项目信息&#xff1b;6申报设置&#xff1b;7专家信息。 —-项目框架SSM&am…

UML之涉众/参与者(角色/执行者)(Actor)/业务主角(BusinessActor)/业务工人(BusinessWorker)/用户/角色辨析【图解】...

参考文档&#xff1a; 【业务建模】(http://www.baike.com/wiki/%E4%B8%9A%E5%8A%A1%E5%BB%BA%E6%A8%A1) 【UML 核心元素之参与者】(http://www.voidcn.com/article/p-obarwwaq-tp.html) 【UML核心元素之参与者】(http://www.voidcn.com/article/p-ntpnhoue-da.html)转载于:htt…

git 报错:Please make sure you have the correct access rights and the repository exists

提示&#xff1a;Warning: Permanently added gitee.com,120.55.226.24 (ECDSA) to the list of known hosts.是公钥出问题了&#xff0c;要先设置用户和邮箱再重新生成ssh公钥即可。 1、首先我得重新在git设置一下身份的名字和邮箱 进入到需要提交的文件夹底下&#xff08;…

java 实现excel 导出功能

实现功能&#xff1a;java导出excel表 1、jsp代码 1 <form id"zhanwForm" action"<%path%>/conferences.do?" target"_self" method"get" > 2 <input type"hidden" name"method" value…

什么是CI/CD

一、简介 CI / CD的采用改变了开发人员和测试人员如何发布软件。 最初是瀑布模型&#xff0c;后来是敏捷开发&#xff0c;现在是DevOps&#xff0c;这是现代开发人员构建出色的产品的技术路线。随着DevOps的兴起&#xff0c;出现了持续集成&#xff08;Continuous Integration…

部署WEB项目到服务器(三)安装mysql到linux服务器(Ubuntu)详解

突发奇想&#xff0c;想在自己电脑上部署一个web网站。 1&#xff0c;首先是下载一个适合自己已安装服务器版本的mysql数据库。 这里使用网上的链接http://dev.mysql.com/downloads/mysql/5.6.html#downloads 或者使用代理网站上下载&#xff1a;https://mirrors.huaweicloud.c…

在Windows下编译ffmpeg完全手册

本文的内容几乎全部来自于FFmpeg on Windows&#xff0c;但是由于国内的网络封锁&#xff0c;很难访问这个域名下的内容&#xff0c;因此我一方面按照我自己的理解和实践做了翻译&#xff0c;另一方面也是为了能提供一个方便的参考方法。 注&#xff1a; 1. 对于compil…

padding和卷积的区别_TensorFlow笔记1——20.CNN卷积神经网络padding两种模式SAME和VALID...

第1种解说&#xff1a;(核心最后一张图&#xff0c;两种填充方式输出的形状尺寸计算公式)在用tensorflow写CNN的时候&#xff0c;调用卷积核api的时候&#xff0c;会有填padding方式的参数&#xff0c;找到源码中的函数定义如下&#xff08;max pooling也是一样&#xff09;&am…

循环神经网络变形之 (Long Short Term Memory,LSTM)

1、长短期记忆网络LSTM简介 在RNN 计算中&#xff0c;讲到对于传统RNN水平方向进行长时刻序列依赖时可能会出现梯度消失或者梯度爆炸的问题。LSTM 特别适合解决这种需要长时间依赖的问题。 LSTM&#xff08;Long Short Term Memory&#xff0c;长短期记忆网络&#xff09;是R…

UE4 ShooterGame Demo的开火的代码

之前一直没搞懂按下鼠标左键开火之后&#xff0c;代码的逻辑是怎么走的&#xff0c;今天看懂了之前没看懂的部分&#xff0c;进了一步 ShooterCharacter.cpp void AShooterCharacter::OnStartFire() {AShooterPlayerController* MyPC Cast<AShooterPlayerController>(Co…

Windows系统使用minGW+msys 编译ffmpeg 0.5的全过程详述

一.环境配置 1.下载并安装 MinGW-5.1.4.exe (http://jaist.dl.sourceforge.net/sourcef … -5.1.4.exe)&#xff0c;安装时选中 g, mingw make。建议安装到c:/mingw. 2.下载并安装 MSYS-1.0.11-rc-1.exe (http://jaist.dl.sourceforge.net/sourcef … 1-rc-1.exe)&#xff0c;安…

程序员 赚钱

业余编程赚钱 程序员的好方法 现在的人生活水平高了&#xff0c;开销也大了&#xff0c;同时对于一些技术性人员来说有很多种&#xff0c;有些程序员自己开公司&#xff0c;开发自己的产品&#xff0c;年赚百万&#xff0c;有些程序员还在给别人打工&#xff0c;每天累死累活的…

代码 优化 指南 实践

C代码优化方案 华中科技大学计算机学院 姓名&#xff1a; 王全明 QQ&#xff1a; 375288012 Email&#xff1a; quanming1119163.com 目录 目录 C代码优化方案 1、选择合适的算法和数据结构 2、使用尽量小的数据类型 3、减少运算的强度 &#xff08;1&…