rabbitmq direct 多个消费者_一文解析 RabbitMQ 最常用的三大模式

326b7a263b51ee38b3ad551ea36120ce.png

Direct 模式

  • 所有发送到 Direct Exchange 的消息被转发到 RouteKey 中指定的 Queue。
  • Direct 模式可以使用 RabbitMQ 自带的 Exchange: default Exchange,所以不需要将 Exchange 进行任何绑定(binding)操作。
  • 消息传递时,RouteKey 必须完全匹配才会被队列接收,否则该消息会被抛弃,

ad388b7026de605ce5d8e21b2535345c.png
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class DirectProducer {public static void main(String[] args) throws Exception {//1. 创建一个 ConnectionFactory 并进行设置ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setVirtualHost("/");factory.setUsername("guest");factory.setPassword("guest");//2. 通过连接工厂来创建连接Connection connection = factory.newConnection();//3. 通过 Connection 来创建 ChannelChannel channel = connection.createChannel();//4. 声明String exchangeName = "test_direct_exchange";String routingKey = "item.direct";//5. 发送String msg = "this is direct msg";channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());System.out.println("Send message : " + msg);//6. 关闭连接channel.close();connection.close();}
}
import com.rabbitmq.client.*;
import java.io.IOException;public class DirectConsumer {public static void main(String[] args) throws Exception {//1. 创建一个 ConnectionFactory 并进行设置ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setVirtualHost("/");factory.setUsername("guest");factory.setPassword("guest");factory.setAutomaticRecoveryEnabled(true);factory.setNetworkRecoveryInterval(3000);//2. 通过连接工厂来创建连接Connection connection = factory.newConnection();//3. 通过 Connection 来创建 ChannelChannel channel = connection.createChannel();//4. 声明String exchangeName = "test_direct_exchange";String queueName = "test_direct_queue";String routingKey = "item.direct";channel.exchangeDeclare(exchangeName, "direct", true, false, null);channel.queueDeclare(queueName, false, false, false, null);//一般不用代码绑定,在管理界面手动绑定channel.queueBind(queueName, exchangeName, routingKey);//5. 创建消费者并接收消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");}};//6. 设置 Channel 消费者绑定队列channel.basicConsume(queueName, true, consumer);}
}
Send message : this is direct msg[x] Received 'this is direct msg'
Topic 模式

Topic 模式

可以使用通配符进行模糊匹配

  • 符号'#" 匹配一个或多个词
  • 符号"*”匹配不多不少一个词

例如:

  • 'log.#"能够匹配到'log.info.oa"
  • "log.*"只会匹配到"log.erro“

b39cb721a3040fd71aac40aee017256e.png
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class TopicProducer {public static void main(String[] args) throws Exception {//1. 创建一个 ConnectionFactory 并进行设置ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setVirtualHost("/");factory.setUsername("guest");factory.setPassword("guest");//2. 通过连接工厂来创建连接Connection connection = factory.newConnection();//3. 通过 Connection 来创建 ChannelChannel channel = connection.createChannel();//4. 声明String exchangeName = "test_topic_exchange";String routingKey1 = "item.update";String routingKey2 = "item.delete";String routingKey3 = "user.add";//5. 发送String msg = "this is topic msg";channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());System.out.println("Send message : " + msg);//6. 关闭连接channel.close();connection.close();}
}
import com.rabbitmq.client.*;
import java.io.IOException;public class TopicConsumer {public static void main(String[] args) throws Exception {//1. 创建一个 ConnectionFactory 并进行设置ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setVirtualHost("/");factory.setUsername("guest");factory.setPassword("guest");factory.setAutomaticRecoveryEnabled(true);factory.setNetworkRecoveryInterval(3000);//2. 通过连接工厂来创建连接Connection connection = factory.newConnection();//3. 通过 Connection 来创建 ChannelChannel channel = connection.createChannel();//4. 声明String exchangeName = "test_topic_exchange";String queueName = "test_topic_queue";String routingKey = "item.#";channel.exchangeDeclare(exchangeName, "topic", true, false, null);channel.queueDeclare(queueName, false, false, false, null);//一般不用代码绑定,在管理界面手动绑定channel.queueBind(queueName, exchangeName, routingKey);//5. 创建消费者并接收消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");}};//6. 设置 Channel 消费者绑定队列channel.basicConsume(queueName, true, consumer);}
}
Send message : this is topc msg[x] Received 'this is topc msg'
[x] Received 'this is topc msg'

Fanout 模式

不处理路由键,只需要简单的将队列绑定到交换机上发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
Fanout交换机转发消息是最快的。

95566e06362d14e2aa6f704419bb141b.png
import com.rabbitmq.client.*;
import java.io.IOException;public class FanoutConsumer {public static void main(String[] args) throws Exception {//1. 创建一个 ConnectionFactory 并进行设置ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setVirtualHost("/");factory.setUsername("guest");factory.setPassword("guest");factory.setAutomaticRecoveryEnabled(true);factory.setNetworkRecoveryInterval(3000);//2. 通过连接工厂来创建连接Connection connection = factory.newConnection();//3. 通过 Connection 来创建 ChannelChannel channel = connection.createChannel();//4. 声明String exchangeName = "test_fanout_exchange";String queueName = "test_fanout_queue";String routingKey = "item.#";channel.exchangeDeclare(exchangeName, "fanout", true, false, null);channel.queueDeclare(queueName, false, false, false, null);//一般不用代码绑定,在管理界面手动绑定channel.queueBind(queueName, exchangeName, routingKey);//5. 创建消费者并接收消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");}};//6. 设置 Channel 消费者绑定队列channel.basicConsume(queueName, true, consumer);}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class FanoutProducer {public static void main(String[] args) throws Exception {//1. 创建一个 ConnectionFactory 并进行设置ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setVirtualHost("/");factory.setUsername("guest");factory.setPassword("guest");//2. 通过连接工厂来创建连接Connection connection = factory.newConnection();//3. 通过 Connection 来创建 ChannelChannel channel = connection.createChannel();//4. 声明String exchangeName = "test_fanout_exchange";String routingKey1 = "item.update";String routingKey2 = "";String routingKey3 = "ookjkjjkhjhk";//任意routingkey//5. 发送String msg = "this is fanout msg";channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());System.out.println("Send message : " + msg);//6. 关闭连接channel.close();connection.close();}
}
Send message : this is fanout msg[x] Received 'this is fanout msg'
[x] Received 'this is fanout msg'
[x] Received 'this is fanout msg'

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/454213.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C# 读取保存App.config配置文件的完整源码参考

最近出差在北京做一个小项目,项目里需要读取配置文件的小功能,觉得挺有参考意义的就把代码发上来给大家参考一下。我们选择了直接用微软的读取配置文件的方法。 这个是程序的运行设计效果,就是把这些参数可以进行灵活设置,灵活保存…

TensorFlow 简介

TensorFlow介绍 Tagline:An open-source software library for Machine Intelligence.Definition:TensorFlow TM is an open source software library fornumerical computation using data flow graphs.GitHub:https://github.com/tensorfl…

webbrowser设置为相应的IE版本

注册表路径: HKEY_LOCAL_MACHINE\SOFTWARE\WOW6432Node\Microsoft\Internet Explorer\Main\FeatureControl\FEATURE_BROWSER_EMULATION 或者HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Internet Explorer\Main\FeatureControl\FEATURE_BROWSER_EMULATION 究竟选择哪一个…

jmeter压力测试_用Jmeter实现对接口的压力测试

一、多个真实用户对接口的压力测试1. 获取多个真实用户的token的两种方法:1)第一种:让开发帮忙生成多个token(多个用户账户生成的token),导出为csv格式的文件(以下步骤均以该方法为基础)2)第二种:自己设置多个用户账户和密码&…

程序员成长之路(转)

什么时候才能成为一个专业程序员呢?三年还是五年工作经验?其实不用的,你马上就可以了,我没有骗你,因为专业程序员与业余程序员的区别主要在于一种态度,如果缺乏这种态度,拥有十年工作经验也还是…

嵌入式开发——PWM高级定时器

学习目标 加强掌握PWM开发流程理解定时器与通道的关系掌握多通道配置策略掌握互补PWM配置策略掌握定时器查询方式掌握代码抽取优化策略掌握PWM调试方式学习内容 需求 点亮8个灯,采用pwm的方式。 定时器 通道 <

TensorFlow 基本操作

Tensorflow基本概念 图(Graph):图描述了计算的过程&#xff0c;TensorFlow使用图来表示计算任务。张量(Tensor):TensorFlow使用tensor表示数据。每个Tensor是一个类型化的多维数组。操作(op):图中的节点被称为op(opearation的缩写)&#xff0c;一个op获得/输入0个或多个Tensor…

TensorFlow 分布式

一、简介 使用单台机器或者单个GPU/CPU来进行模型训练&#xff0c;训练速度会受资源的影响&#xff0c;因为毕竟单个的设备的计算能力和存储能力具有一定的上限的&#xff0c;针对这个问题&#xff0c;TensorFlow支持分布式模型运算&#xff0c;支持多机器、多GPU、多CPU各种模…

第五周测试

---恢复内容开始--- 一 视频知识 1 linux系统下如何区分内核态与用户态 在内核态&#xff1a;cs:eip可以是任意的地址&#xff0c;4G的内存地址空间 在用户态&#xff1a;cs:eip只能访问0x00000000—0xbfffffff的地址空间 2 系统调用的三层皮&#xff1a;xyz、system_call和sys…

latex公式对齐_Word 写公式最方便的方法

自从用上了word 2016之后&#xff0c;发现他的公式编辑器真香!真香!!他有了latex的优雅&#xff0c;又有了Mathtype的可视化效果&#xff0c;甚至更好哈&#xff0c;当编辑大量公式时也不会因为插件问题卡掉当前的努力。学起来也不复杂&#xff0c;反正是word. 强烈推荐。我们最…

路要怎么走?关于程序员成长的一点思考

程序员的我们&#xff0c;是否想过今后的路该怎么走、如何发展、技术怎样提高?其实这也是我一直在思考的问题。下面就此问题&#xff0c;分享下我的看法。因为我阅历有限&#xff0c;有什么说的不对的&#xff0c;大家见谅&#xff0c;千万不要喷…… 一、程序员应该打好基础 …

TensorFlow 常见API

数据类型转换相关API Tensor Shape获取以及设置相关API Tensor合并、分割相关API Error相关类API 常量类型的Tensor对象相关API 序列和随机Tensor对象相关API Session相关API 逻辑运算符相关API 比较运算符相关API 调试相关API 图像处理-编码解码相关API 图像处理-调整大小相关…

8.2 命令历史

2019独角兽企业重金招聘Python工程师标准>>> 命令历史 history //查看之前的命令.bash_history //存放之前敲过的命令&#xff0c;在 /root/ 目录下最大1000条 //默认参数值是1000条变量HISTSIZE/etc/profile中修改 //在其中可编辑HISTSIZE参数HISTTIMEFORMAT"…

TensorFlow 实例一:线性回归模型

代码 # -- encoding:utf-8 -- """ Create by ibf on 2018/5/6 """import numpy as np import tensorflow as tf# 1. 构造一个数据 np.random.seed(28) N 100 x np.linspace(0, 6, N) np.random.normal(loc0.0, scale2, sizeN) y 14 * x - …

Dapper的基本使用

Dapper是.NET下一个micro的ORM&#xff0c;它和Entity Framework或Nhibnate不同&#xff0c;属于轻量级的&#xff0c;并且是半自动的。也就是说实体类都要自己写。它没有复杂的配置文件&#xff0c;一个单文件就可以了。给出官方地址。 http://code.google.com/p/dapper-dot-n…

易语言神经网络验证码识别_递归神经网络 GRU+CTC+CNN 教会验证码识别

利用 NLP 技术做简单数据可视化分析Chat 简介&#xff1a;用递归神经网络采用端到端识别图片文字&#xff0c;递归神经网络大家最早用 RNN &#xff0c;缺陷造成梯度消失问题&#xff1b;然后采用了 LSTM&#xff0c;解决 RNN 问题&#xff0c;并且大大提高准确率&#xff1b;现…

Android4.0蓝牙使能的详细解析

毫无疑问&#xff0c;bluetooth的打开是在Settings中进行的操作。因此&#xff0c;冤有头&#xff0c;债有主&#xff0c;我们来到了Settings.java中&#xff0c;果然发现了相关的代码如下&#xff1a; mBluetoothEnabler new BluetoothEnabler(context, new Switch(context));…

移动端导出excel_连载系列【4】Excel开发移动端quot;APPquot;

前三篇文章介绍了百度地图生成器、源代码编辑器、GPS经纬度批量转换工具、源代码编辑器中添加自定义功能按钮和地图控件。这些写好的Java Script代码虽然可以实现所有期望的结果&#xff0c;但毕竟不是一个HTML文件&#xff0c;不便于传播和使用&#xff0c;更无法变成一个类似…

《操作系统》OS学习(二):启动、中断、异常

Bootloader:加载OS。操作系统一开始是放在DISK&#xff08;硬盘&#xff09;中&#xff0c;并不是放在内存中。 BIOS&#xff1a;基本I/O处理系统。存放在ROMRead-Only Memory&#xff09;只读存储中 BIOS&#xff08;Basic Input/Output System&#xff09;基本输入输出系统。…

常用css属性集(持续更新…)

禁止换行&#xff0c;超出部分显示…&#xff1a;a. 代码&#xff1a;.hide_word{ max-width: 100px; white-space:nowrap; overflow:hidden; text-overflow:ellipsis; } b. 效果&#xff1a; 本文转自 bilinyee博客&#xff0c;原文链接&#xff1a; http://blog.51cto.co…