消息队列——rabbitmq的不同工作模式

目录

Work queues 工作队列模式

 Pub/Sub 订阅模式

Routing路由模式

Topics通配符模式 

 工作模式总结


Work queues 工作队列模式

C1和C2属于竞争关系,一个消息只有一个消费者可以取到。

 代码部分只需要用两个消费者进程监听同一个队里即可。

两个消费者呈现竞争关系。

用一个生产者推送10条消息

        for(int i=0;i<10;i++){String body=i+"hello rabbitmq!!!";channel.basicPublish("","work_queues",null,body.getBytes());}

两个监听的消费者接收情况如下。 

 

 Pub/Sub 订阅模式

一个生产者发送消息后有两个消费者可以收到消息。

生产者把消息发给交换机,交换机再把消息通过Routes路由分发给不同的队列。

//发送消息
public class producer_PubSub {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory=new ConnectionFactory();//2.设置参数factory.setHost(""); //设置ip地址。默认为127.0.0.1factory.setPort(5672);              //端口 默认值5672factory.setVirtualHost("/itcast");  //设置虚拟机 默认值/factory.setUsername("yhy");        //用户名,默认值guestfactory.setPassword("");     //密码,默认值guest//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.createChannel();/** exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)* 参数:* 1.exchange  : 交换价名称* 2.type      : 交换机类型 ,有四种*               DIRECT("direct"),  定向FANOUT("fanout"),   扇形(广播),发送消息到每一个与之绑定队列TOPIC("topic"),     通配符的方式HEADERS("headers"); 参数匹配*3.durable  :是否持久化* 4.autoDelete:是否自动删除* 5.internal: 内部使用。一般false* 6.arguments:参数* *///5.创建交换机String exchangeName="test_fanout";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);//6.创建队列String queue1Name="test_fanout_queue1";String queue2Name="test_fanout_queue2";channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);/** queueBind(String queue, String exchange, String routingKey)* 参数:* queue:队列名* exchange:交换机名称* routingKey:路由键,绑定规则*   如果交换机类型为fanout,routingKey设置为""* *///7.绑定队列和交换机channel.queueBind(queue1Name,exchangeName,"");channel.queueBind(queue2Name,exchangeName,"");String body="日志信息:调用了findAll方法";//8.发送消息channel.basicPublish(exchangeName,"",null,body.getBytes());//9.释放资源channel.close();connection.close();}
}

 运行之后两个队列里面就会多一条消息

两个消费者的代码大同小异,只是绑定的队列名不同,这里只给其中一个

public class consumer_PubSub1 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory=new ConnectionFactory();//2.设置参数factory.setHost(""); //设置ip地址。默认为127.0.0.1factory.setPort(5672);              //端口 默认值5672factory.setVirtualHost("/itcast");  //设置虚拟机 默认值/factory.setUsername("yhy");        //用户名,默认值guestfactory.setPassword("");     //密码,默认值guest//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.createChannel();String queue1Name="test_fanout_queue1";String queue2Name="test_fanout_queue2";/** basicConsume(String queue, boolean autoAck, Consumer callback)* 参数:*   1.队列名称*   2.autoAck:是否自动确认*   3.callback:回调对象* *///6.接收消息Consumer consumer=new DefaultConsumer(channel){/** 回调方法,当收到消息后,会自动执行该方法* 1.consumerTag:标识* 2.envelope :获取一些信息,交换机,路由key...* 3.properties: 配置信息* 4.body: 数据* */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag:"+consumerTag);
//                System.out.println("Exchange:"+envelope.getExchange());
//                System.out.println("RoutingKey:"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);System.out.println("body:"+new String(body));System.out.println("将日志信息打印到控制台......");}};channel.basicConsume(queue1Name,true,consumer);//不需要关闭资源}
}

 控制台输出有

Routing路由模式

对于特定级别的信息会发送到别的队列,如上图的error,在发送消息时也会有一个routing,只要和后面的队列对应上就可以发送到对应队列。 

 生产者代码:

//发送消息
public class producer_Routing {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory=new ConnectionFactory();//2.设置参数factory.setHost(""); //设置ip地址。默认为127.0.0.1factory.setPort(5672);              //端口 默认值5672factory.setVirtualHost("/itcast");  //设置虚拟机 默认值/factory.setUsername("yhy");        //用户名,默认值guestfactory.setPassword("");     //密码,默认值guest//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.createChannel();/** exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)* 参数:* 1.exchange  : 交换价名称* 2.type      : 交换机类型 ,有四种*               DIRECT("direct"),  定向FANOUT("fanout"),   扇形(广播),发送消息到每一个与之绑定队列TOPIC("topic"),     通配符的方式HEADERS("headers"); 参数匹配*3.durable  :是否持久化* 4.autoDelete:是否自动删除* 5.internal: 内部使用。一般false* 6.arguments:参数* *///5.创建交换机String exchangeName="test_direct";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);//6.创建队列String queue1Name="test_direct_queue1";String queue2Name="test_direct_queue2";channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);/** queueBind(String queue, String exchange, String routingKey)* 参数:* queue:队列名* exchange:交换机名称* routingKey:路由键,绑定规则*   如果交换机类型为fanout,routingKey设置为""* *///7.绑定队列和交换机//队列1绑定errorchannel.queueBind(queue1Name,exchangeName,"error");//队列2绑定error,info,warningchannel.queueBind(queue2Name,exchangeName,"info");channel.queueBind(queue2Name,exchangeName,"error");channel.queueBind(queue2Name,exchangeName,"warning");String body="日志信息:调用了findAll方法,级别:info,error,warning";//8.发送消息channel.basicPublish(exchangeName,"error",null,body.getBytes());//9.释放资源channel.close();connection.close();}
}

消费者代码(两个消费者就绑定队列名不一样):

public class consumer_Routing1 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory=new ConnectionFactory();//2.设置参数factory.setHost(""); //设置ip地址。默认为127.0.0.1factory.setPort(5672);              //端口 默认值5672factory.setVirtualHost("/itcast");  //设置虚拟机 默认值/factory.setUsername("yhy");        //用户名,默认值guestfactory.setPassword("");     //密码,默认值guest//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.createChannel();String queue1Name="test_direct_queue1";String queue2Name="test_direct_queue2";/** basicConsume(String queue, boolean autoAck, Consumer callback)* 参数:*   1.队列名称*   2.autoAck:是否自动确认*   3.callback:回调对象* *///6.接收消息Consumer consumer=new DefaultConsumer(channel){/** 回调方法,当收到消息后,会自动执行该方法* 1.consumerTag:标识* 2.envelope :获取一些信息,交换机,路由key...* 3.properties: 配置信息* 4.body: 数据* */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag:"+consumerTag);
//                System.out.println("Exchange:"+envelope.getExchange());
//                System.out.println("RoutingKey:"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);System.out.println("body:"+new String(body));System.out.println("将日志信息存储到数据库");}};channel.basicConsume(queue1Name,true,consumer);//不需要关闭资源}
}

Topics通配符模式 

发送消息时设定的routingkey会和后面的routingkey进行匹配。

生产者代码:

//发送消息
public class producer_Topic {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory=new ConnectionFactory();//2.设置参数factory.setHost(""); //设置ip地址。默认为127.0.0.1factory.setPort(5672);              //端口 默认值5672factory.setVirtualHost("/itcast");  //设置虚拟机 默认值/factory.setUsername("yhy");        //用户名,默认值guestfactory.setPassword("");     //密码,默认值guest//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.createChannel();/** exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)* 参数:* 1.exchange  : 交换价名称* 2.type      : 交换机类型 ,有四种*               DIRECT("direct"),  定向FANOUT("fanout"),   扇形(广播),发送消息到每一个与之绑定队列TOPIC("topic"),     通配符的方式HEADERS("headers"); 参数匹配*3.durable  :是否持久化* 4.autoDelete:是否自动删除* 5.internal: 内部使用。一般false* 6.arguments:参数* *///5.创建交换机String exchangeName="test_topic";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);//6.创建队列String queue1Name="test_topic_queue1";String queue2Name="test_topic_queue2";channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);/** queueBind(String queue, String exchange, String routingKey)* 参数:* queue:队列名* exchange:交换机名称* routingKey:路由键,绑定规则*   如果交换机类型为fanout,routingKey设置为""* *///7.绑定队列和交换机// routing key 系统的名称.日志的级别。//需求:所有error级别的日志存入数据库,所有order系统的日志存入数据库channel.queueBind(queue1Name,exchangeName,"#.error");channel.queueBind(queue1Name,exchangeName,"order.*");channel.queueBind(queue2Name,exchangeName,"*.*");String body="日志信息:调用了findAll方法";//8.发送消息channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());//9.释放资源channel.close();connection.close();}
}

 消费者代码

public class consumer_Topic1 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory=new ConnectionFactory();//2.设置参数factory.setHost(""); //设置ip地址。默认为127.0.0.1factory.setPort(5672);              //端口 默认值5672factory.setVirtualHost("/itcast");  //设置虚拟机 默认值/factory.setUsername("yhy");        //用户名,默认值guestfactory.setPassword("");     //密码,默认值guest//3.创建连接ConnectionConnection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.createChannel();String queue1Name="test_topic_queue1";String queue2Name="test_topic_queue2";/** basicConsume(String queue, boolean autoAck, Consumer callback)* 参数:*   1.队列名称*   2.autoAck:是否自动确认*   3.callback:回调对象* *///6.接收消息Consumer consumer=new DefaultConsumer(channel){/** 回调方法,当收到消息后,会自动执行该方法* 1.consumerTag:标识* 2.envelope :获取一些信息,交换机,路由key...* 3.properties: 配置信息* 4.body: 数据* */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag:"+consumerTag);
//                System.out.println("Exchange:"+envelope.getExchange());
//                System.out.println("RoutingKey:"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);System.out.println("body:"+new String(body));System.out.println("将日志信息存储到数据库");}};channel.basicConsume(queue1Name,true,consumer);//不需要关闭资源}
}

 工作模式总结

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/5301.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

安全防御 --- DDOS攻击(01)

DOS攻击&#xff08;deny of service&#xff09;--- 拒绝式服务攻击 例&#xff1a;2016年10月21日&#xff0c;美国提供动态DNS服务的DynDNS遭到DDOS攻击&#xff0c;攻击导致许多使用DynDNS服务的网站遭遇访问问题&#xff0c;此事件中&#xff0c;黑客人就是运用了DNS洪水…

golang waitgroup

案例 WaitGroup 可以解决一个 goroutine 等待多个 goroutine 同时结束的场景&#xff0c;这个比较常见的场景就是例如 后端 worker 启动了多个消费者干活&#xff0c;还有爬虫并发爬取数据&#xff0c;多线程下载等等。 我们这里模拟一个 worker 的例子 package mainimport (…

ChatGPT与Claude对比分析

一 简介 1、ChatGPT: 访问地址&#xff1a;https://chat.openai.com/ 由OpenAI研发,2022年11月发布。基于 transformer 结构的大规模语言模型,包含1750亿参数。训练数据集主要是网页文本,聚焦于流畅的对话交互。对话风格友好,回复通顺灵活,富有创造性。存在一定的安全性问题,可…

【深度学习】基于BRET的高级主题检测

一、说明 使用BERT,UMAP和HDBSCAN捕获文档主题,紧随最先进的BERTopic架构(transformer编码器)。 主题检测是一项 NLP 任务,旨在从文本文档语料库中提取全局“主题”。例如,如果正在查看书籍描述的数据集,主题检测将使我们能够将书籍分类,例如:“浪漫”、“科幻”、“旅…

Springboot+Flask+Neo4j+Vue2+Vuex+Uniapp+Mybatis+Echarts+Swagger综合项目学习笔记

文章目录 Neo4j教程&#xff1a;Neo4j高性能图数据库从入门到实战 医疗问答系统算法教程&#xff1a;医学知识图谱问答系统项目示例&#xff1a;neo4j知识图谱 Vueflask 中药中医方剂大数据可视化系统可视化技术&#xff1a;ECharts、D.jsflask教程&#xff1a;速成教程Flask w…

list模拟实现

一、结点的定义 有三个成员&#xff0c;2个指向前面和后面的指针&#xff0c;一个表示结点存储T类型的值。 对于_prev和_next&#xff0c;类型是 list_node<T>*&#xff0c;不是list_node*&#xff0c;加上类型参数T之后&#xff0c;才是模板类的类型。 构造函数中&am…

【MySQL】MySQL8.1.0版本正式发布带来哪些新特性?

文章目录 前言一、畅谈新版本二、8.1.0版本部署2.1、环境准备2.2、配置yum安装依赖2.3、用户及目录创建2.4、创建用户及组2.5、解压缩包2.6、环境变量配置2.7、创建参数文件2.8、数据库初始化2.9、启动Mysql2.10、登陆MySQL 8.1 三、新特性3.1、密码参数3.2、错误日志加强3.3、…

Spring Security OAuth2.0(6):自定义认证自定义登录页

文章目录 自定义登录界面配置自定义登录页面 自定义登录界面 \qquad 你可能想知道登录页面从哪里来&#xff1f;因为我们并没有提供任何的HTML或JSP文件。Spring Security 的默认配置没有明确设定一个登录页面的URL&#xff0c;因此Spring Security 会根据启用的功能自动生成一…

Godot实用代码-存取存档的程序设计

1. Settings.gd 全局变量 用于保存玩家设置 对应Settings.json 2. Data.gd 全局变量 用于保存玩具数据 对应Data.json 实践逻辑指南 1.在游戏开始的时候&#xff08;游戏场景入口的_ready()处&#xff0c; Settings.gd

Linux内核结构与特性简介

系统调用接口&#xff1a;位于最上层&#xff0c;实现了一些基本的功能&#xff0c;如read和write等系统调用。这是用户空间程序与内核交互的接口&#xff0c;提供了对内核功能的访问。 内核代码&#xff1a;位于系统调用接口之下&#xff0c;可以看作是独立于体系结构的通用内…

qt和vue交互

1、首先在vue项目中引入qwebchannel /******************************************************************************** Copyright (C) 2016 The Qt Company Ltd.** Copyright (C) 2016 Klarlvdalens Datakonsult AB, a KDAB Group company, infokdab.com, author Milian …

13_Linux无设备树Platform设备驱动

目录 Linux驱动的分离与分层 驱动的分隔与分离 驱动的分层 platform平台驱动模型简介 platform总线 platform驱动 platform设备 platform设备程序编写 platform驱动程序编写 测试APP编写 运行测试 Linux驱动的分离与分层 像I2C、SPI、LCD 等这些复杂外设的驱动就不…

Fortinet Accelerate 2023·中国区巡展收官丨让安全成就未来

7月18日&#xff0c;2023 Fortinet Accelerate Summit在上海成功举办&#xff01;这亦象征着“Fortinet Accelerate2023中国区巡展”圆满收官。Fortinet携手来自多个典型行业的百余位代表客户&#xff0c;以及Telstra - PBS 太平洋电信、Tenable等多家生态合作伙伴&#xff0c;…

利用数据分析告警机制,实现鸿鹄与飞书双向集成

需求描述 实现鸿鹄与飞书的双向集成&#xff0c;依赖鸿鹄的告警机制&#xff0c;可以发送用户关心的信息到飞书。同时依赖飞书强大的卡片消息功能&#xff0c;在飞书消息里面能够通过链接&#xff08;如下图&#xff09;返回到鸿鹄以方便用户进一步排查和分析问题。 解决方案 1…

CGT Asia嘉年华|2023第四届亚洲细胞与基因治疗 创新峰会(广州站)10月升级启航

近年来&#xff0c;全球CGT发展突飞猛进&#xff0c;为遗传罕见病、难治性慢性病和肿瘤患者带来了新的希望&#xff0c;也成为整个国际领域科技竞争的未来焦点。国家发改委发布的《“十四五”生物经济发展规划》明确指出要重点发展基因诊疗、干细胞治疗、免疫细胞治疗等新技术&…

利用鸿鹄优化共享储能的SCADA 系统功能,赋能用户数据自助分析

摘要 本文主要介绍了共享储能的 SCADA 系统大数据架构&#xff0c;以及如何利用鸿鹄来更好的优化 SCADA 系统功能&#xff0c;如何为用户进行数据自助分析赋能。 1、共享储能介绍 说到共享储能&#xff0c;可能不少朋友比较陌生&#xff0c;下面我们简单介绍一下共享储能的价值…

Python高光谱遥感数据处理与高光谱遥感机器学习方法深度应用

目录 ​第一章 高光谱基础 第二章 高光谱开发基础&#xff08;Python&#xff09; 第三章 高光谱机器学习技术&#xff08;python&#xff09; 第四章 典型案例操作实践 更多推荐 本教程提供一套基于Python编程工具的高光谱数据处理方法和应用案例。 涵盖高光谱遥感的基础…

2023年7月18日,File类,IO流,线程

File类 1. 概述 File&#xff0c;是文件和目录路径的抽象表示 File只关注文件本身的信息&#xff0c;而不能操作文件里的内容 。如果需要读取或写入文件内容&#xff0c;必须使用IO流来完成。 在Java中&#xff0c;java.io.File 类用于表示文件或目录的抽象路径名。它提供了一…

selenium.chrome怎么写扩展拦截或转发请求?

Selenium WebDriver 是一组开源 API&#xff0c;用于自动测试 Web 应用程序&#xff0c;利用它可以通过代码来控制chrome浏览器&#xff01; 有时候我们需要mock接口的返回&#xff0c;或者拦截和转发请求&#xff0c;今天就来实现这个功能。 代码已开源&#xff1a; https:/…

HTML语法

文章目录 前言HTML 文件基本结构常见标签标签种类特殊符号图片链接a链接 双标签链接 列表表格 &#xff1a;表单多行文本域: 前言 HTML是有标签组成的 <body>hello</body>大部分标签成对出现. 为开始标签, 为结束标签. 少数标签只有开始标签, 称为 “单标签”. 开…