200、使用默认 Exchange 实现 P2P 消息 之 消息生产者(发送消息) 和 消息消费者(消费消息)

RabbitMQ 工作机制图:

Connection: 代表客户端(包括消息生产者和消费者)与RabbitMQ之间的连接。
Channel: 连接内部的Channel。channel:通道
Exchange: 充当消息交换机的组件。
Queue: 消息队列。
在这里插入图片描述

★ 消费消息

使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下:

(1)创建ConnectionFactory,设置连接信息,再通过ConnectionFactory获取Connection。

(2)通过Connection获取Channel。

(3)根据需要,调用Channel的queueDeclare()方法声明队列,如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。

(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,
该参数相当于JMS中的消息监听器。

这个 basicConsume()方法 相当于是异步消费。
而同步消费会出现阻塞情况,这就失去消息中间件存在的意义,所以先讲异步消费。

★ 发送消息

使用RabbitMQ Java Client依赖库开发消息生产者的大致步骤如下:

(1)创建ConnectionFactory,设置连接信息,再通过ConnectionFactory获取Connection。

(2)通过Connection获取Channel。

(3)根据需要调用exchangeDeclare()、queueDeclare()方法声明Exchange和队列、并完成队列与Exchange的绑定。
如果声明的Exchange还不存在,则创建该Exchange;否则直接使用已有的Exchange。
Declare:声明、宣布

(4)调用Channel的basicPublish()方法发送消息,调用该方法的第一个参数是exchange,
第二个参数为路由key,最后两个参数依次是消息属性和消息数据体。

【注意】:虽然消息生产者与队列是完全隔离的, 但如果消息生产者不声明消息队列,那系统中就可能暂时还没有任何消息队列。

在这种情况下,消息生产者向Exchange发送的消息将不会分发给任何队列,这些消息直接就被丢弃了。

【备注】:为了保证消息生产者能将消息发送到指定队列,消息生产者需要声明消息队列,保证消息队列的存在。

**问题:**消息生产者 和 消息队列 是完全隔离的,但是生产者为什么还要声明消息队列?
**原因:**因为程序如果先运行消息生产者,后运行消费者,而声明消息队列的方法又只存在消费者那边,那么在先运行消息生产者时,就会因为还没有生成消息队列,所以生产者发送到exchange的消息,会因为没有对应的消息队列而被丢弃。

代码演示:

先创建一个普通的 maven 项目。
在这里插入图片描述
添加一些属性 和 RabbitMQ的依赖
在这里插入图片描述

在这里插入图片描述

创建消息消费者

把创建连接的代码封装到一个方法里去。
在这里插入图片描述

消费者的代码
在这里插入图片描述
在这里插入图片描述

注意:channel.basicConsume 的第二个参数 autoAck:true,就是表示自动确认消息已经被消费完成了。就是当消费者接收到消息之后,就立马返回一个已经确认消费的消息回去给消息队列。
这样容易出现问题,就是消费者这边因为一收到消息就会自动确认消息被消费了并返回已经消费消息的结果回去给消息队列,但是可能消费者其实还没有把消息消费掉,而消息队列那边又以为消费者已经把消息消费了,所以就继续发消息给那个消费者。
而消费者一收到消息又自动确认消费并返回,就会导致这个消息队列的消息越来越多,然后消费者消费不完。
在这里插入图片描述
这个演示已消费未确认的演示放最后



执行消费者
在这里插入图片描述
控制台查看
原本没有这个消息队列,通过调用Channel的queueDeclare()方法声明队列,如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列

一开始消费者声明的这个消息队列,这个是否独占的exclusive 参数我是写true,
所以下图的 myQueue01的 Features 就是 Excl

在这里插入图片描述
在这里插入图片描述

这个就是创建的消费者。
用于消费这个 myQueue01 消息队列的消费者。
在这里插入图片描述

后面把 exclusive 改成了 false,是因为后面的生产者,需要也声明这个 myQueue01 消息队列,而如果这个消息队列是 独占的,就没法声明了,所以改成 false
在这里插入图片描述
在这里插入图片描述

创建消息生产者

生产者发送完消息就会关闭资源
消费者则是一直启动着
在这里插入图片描述

测试

先启动消费者或者启动生产者都一样,因为生产者和消费者都有调用queueDeclare() 方法声明消息队列,所以不存在发送消息后没找到对应的消息队列而导致消息被丢弃的情况。

启动消费者
在这里插入图片描述

然后启动生产者
生产者发送消息
在这里插入图片描述
再看消费者,已经消费了一条消息了。
因为先启动消费者,所以生产者发送的消息马上被消费了,在控制台的队列就看不到了。
在这里插入图片描述

再测试:

先启动生产者
关闭消费者,然后启动生产者发送消息
可以看出消息已经生产发送到消息队列了
在这里插入图片描述
在这里插入图片描述

这一步的流程图
在这里插入图片描述

启动消费者消费消息
在这里插入图片描述

流程图:
在这里插入图片描述

已消费未确认

注意:channel.basicConsume 的第二个参数 autoAck:true,就是表示自动确认消息已经被消费完成了。就是当消费者接收到消息之后,就立马返回一个已经确认消费的消息回去给消息队列。
这样容易出现问题,就是消费者这边因为一收到消息就会自动确认消息被消费了并返回已经消费消息的结果回去给消息队列,但是可能消费者其实还没有把消息消费掉,而消息队列那边又以为消费者已经把消息消费了,所以就继续发消息给那个消费者。
而消费者一收到消息又自动确认消费并返回,就会导致这个消息队列的消息越来越多,然后消费者消费不完。
在这里插入图片描述
在这里插入图片描述

如图:因为 autoAck 为false , 所以消费者消费消息后没有进行确认。这里的 unacked 条数就为1.

如果改成 autoAck 为false ,那么消费者消费消息的代码,要加上确认消息的方法。
在这里插入图片描述
这个就是手动确认消息。
在这里插入图片描述

完整代码:

ConnectionUtil 连接工具类

在这里插入图片描述

package cn.ljh.app.rabbitmq.util;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;//连接工具
public class ConnectionUtil
{//获取连接的方法public static Connection getConnection() throws IOException, TimeoutException{//创建连接工厂----这个ConnectionFactory源码可以看出有构造器,所以直接new一个出来ConnectionFactory connectionFactory =  new ConnectionFactory();//设置连接信息connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("ljh");connectionFactory.setPassword("123456");connectionFactory.setVirtualHost("/"); //连接虚拟主机//从连接工厂获取连接Connection connection = connectionFactory.newConnection();//返回连接return connection;}
}

P2PProducer 生产者

package cn.ljh.app.rabbitmq.producer;import cn.ljh.app.rabbitmq.consumer.P2PConsumer;
import cn.ljh.app.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;//消息生产者--使用默认的exchange
public class P2PProducer
{//(1)创建ConnectionFactory,设置连接信息,再通过ConnectionFactory获取Connection。//(2)通过Connection获取Channel。//(3)根据需要调用exchangeDeclare()、queueDeclare()方法声明Exchange和队列、并完成队列与Exchange的绑定。//    如果声明的Exchange还不存在,则创建该Exchange;否则直接使用已有的Exchange。//(4)调用Channel的basicPublish()方法发送消息,调用该方法的第一个参数是exchange,//    第二个参数为路由key,最后两个参数依次是消息属性和消息数据体。public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel。Channel channel = conn.createChannel();//3、调用exchangeDeclare()方法声明Exchange、调用queueDeclare()方法声明队列,并完成队列与Exchange的绑定//此处打算直接使用默认的Exchange来分发消息,因此无需声明 Exchange,只需声明队列channel.queueDeclare(P2PConsumer.QUEUE_NAME, true, false, false, null);String message = "生产者发送的消息的内容";//4、调用Channel的basicPublish()方法发送消息channel.basicPublish(""/*默认的 Exchange 没有名字,所以用空的字符串*/,P2PConsumer.QUEUE_NAME/*使用队列名作为路由key,表明该消息将会被路由到该队列*/,null /*指定额外的消息的属性*/,message.getBytes(StandardCharsets.UTF_8)/*消息体必须是字节数组类型-->byte[]*/);//5、关闭资源//关闭通道channel.close();//关闭连接conn.close();}
}

P2PConsumer 消费者

package cn.ljh.app.rabbitmq.consumer;import cn.ljh.app.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;//消息消费者
public class P2PConsumer
{// 使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下://(1)创建ConnectionFactory连接工厂,设置连接信息,再通过ConnectionFactory获取Connection连接。//(2)通过Connection获取Channel。//(3)根据需要、调用Channel的queueDeclare()方法声明队列,  Declare:声明、宣布//    如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。//(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,该参数相当于JMS中的消息监听器。//常量public final static String QUEUE_NAME = "myQueue01";public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel 消息通道Channel channel = conn.createChannel();//3、调用 Channel 的 queueDeclare() 方法声明队列//如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列//参数1:声明的队列名; 参数2:消息队列是否持久化//参数3:是否只允许该消息消费者消费该队列的消息,为true,则其他消费者在这个myQueue01队列消息积堆过多的情况下,也无法帮忙消费。//参数4:是否自动删除(如果为true,在该队列没消息的情况下,会自动删除该队列) 参数5:填写额外的参数channel.queueDeclare(QUEUE_NAME, true, false, false, null);//4、调用Channel 的 basicConsume()方法开始处理消费消息channel.basicConsume(QUEUE_NAME/*消费这个名字的消费队列里面的消息*/,true/*消息的确认模式:是否自动确认*/,new DefaultConsumer(channel){//处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:@Overridepublic void handleDelivery(String consumerTag,Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,AMQP.BasicProperties properties /*消息的那些属性*/,byte[] body /*body:消息的消息体*/) throws IOException{//把消息体中的消息拿出来String message = new String(body, "UTF-8");//printf:格式化输出函数   %s:输出字符串  %n:换行System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",envelope.getExchange(),envelope.getRoutingKey(),message);}});}}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.ljh</groupId><artifactId>rabbitmqtest</artifactId><version>1.0.0</version><name>rabbitmqtest</name><!--  属性  --><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>11</java.version></properties><!--  依赖  --><dependencies><!-- RabbitMQ 的依赖库 --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.13.0</version></dependency></dependencies></project>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/105297.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

UGUI交互组件ScrollView

一.ScrollView的结构 对象说明Scroll View挂有Scroll Rect组件的主体对象Viewport滚动显示区域&#xff0c;有Image和mask组件Content显示内容的父节点&#xff0c;只有个Rect Transform组件Scrollbar Horizontal水平滚动条Scrollbar Vertical垂直滚动条 二.Scroll Rect组件的属…

c# xml 参数读取读取的简单使用

完整使用之测试参数的读取&#xff08;xml&#xff09; 保存一个xml文档&#xff08;如果没有就会生成一个默认的 里面的参数用的是我们默认设置的&#xff09;&#xff0c;之后每次更改里面的某项&#xff0c;然后保存 类似于重新刷新一遍。 这里所用的xml测试参数前面需要加…

【TA 挖坑04】薄膜干涉 镭射材质 matcap

镭射材质&#xff0c;相对物理的实现&#xff1f; 万物皆可镭射&#xff0c;个性吸睛的材质渲染技术 - 知乎 (zhihu.com) 薄膜干涉材质&#xff0c;matcap更trick的方法&#xff1f;matcapremap&#xff0c; MatCap原理介绍及应用 - 知乎 (zhihu.com) 庄懂的某节课也做了mat…

echarts折线图(其他图也是一样)设置tooltip自动滚动

按顺序自动滚动效果 <div class"leftComp-charts" id"chartsBox"></div>chartsData: {roadNorm: [],time: []},eChartsTimer: nullinitChartsBox() {this.option {tooltip: {trigger: "axis",axisPointer: {// 方法一type: "s…

数据结构--》掌握数据结构中的查找算法

当你需要从大量数据中查找某个元素时&#xff0c;查找算法就变得非常重要。 无论你是初学者还是进阶者&#xff0c;本文将为你提供简单易懂、实用可行的知识点&#xff0c;帮助你更好地掌握查找在数据结构和算法中的重要性&#xff0c;进而提升算法解题的能力。接下来让我们开启…

【深度学习】深度学习实验二——前馈神经网络解决上述回归、二分类、多分类、激活函数、优化器、正则化、dropout、早停机制

一、实验内容 实验内容包含要进行什么实验&#xff0c;实验的目的是什么&#xff0c;实验用到的算法及其原理的简单介绍。 1.1 手动实现前馈神经网络解决上述回归、二分类、多分类问题 分析实验结果并绘制训练集和测试集的loss曲线。 原理介绍&#xff1a;回归问题使用的损失函…

在命令行下使用Apache Ant

Apache Ant的帮助文档 离线帮助文档 在<ant的安装目录>/manual下是离线帮助文档 双击index.html可以看到帮助文档的内容&#xff1a; 在线帮助文档 最新发布版本的帮助文档https://ant.apache.org/manual/index.html Apache Ant的命令 ant命令行格式 ant [opt…

Uniapp 入门

创建项目 参考&#xff1a;uni-app创建新页面和页面的配置_uniapp多页面配置-CSDN博客 添加页面 添加路由 显示效果 网址&#xff1a;http://localhost:8080/#/pages/task/taskDetails 参考&#xff1a;uni-app官网 在 HBuilder X 使用命令行引入 uni-ui npm i dcloudio/un…

云开发校园宿舍/企业/部门/物业故障报修小程序源码

微信小程序云开发校园宿舍企业单位部门物业报修小程序源码&#xff0c;这是一款云开发校园宿舍报修助手工具系统微信小程序源码&#xff0c;适用于学校机房、公司设备、物业管理以及其他团队后勤部&#xff0c;系统为简单云开发&#xff0c;不需要服务器域名即可部署&#xff0…

电子电器架构——基于Adaptive AUTOSAR的电子电器架构简析

基于Adaptive AUTOSAR的电子电器架构简析 我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 屏蔽力是信息过载时代一个人的特殊竞争力,任何消耗你的人和事,多看一眼都是你的不对。非必要不费力证明…

如何在.NET Core3.1 类库项目中使用System.Windows.Forms

网上说法大多都是直接添加对.Net Framework框架的引用&#xff0c;但是这种方法打包很不友好。于是开始了网络搜索&#xff0c;翻到了微软的文档&#xff0c;才找到直接引用 System.Windows.Froms 程序集的方法。还隐藏的很深&#xff0c;地址&#xff1a;Upgrade a Windows Fo…

【数据结构】:二叉树与堆排序的实现

1.树概念及结构(了解) 1.1树的概念 树是一种非线性的数据结构&#xff0c;它是由n&#xff08;n>0&#xff09;个有限结点组成一个具有层次关系的集合把它叫做树是因为它看起来像一棵倒挂的树&#xff0c;也就是说它是根朝上&#xff0c;而叶朝下的有一个特殊的结点&#…

想要精通算法和SQL的成长之路 - 滑动窗口和大小根堆

想要精通算法和SQL的成长之路 - 滑动窗口和大小根堆 前言一. 大小根堆二. 数据流的中位数1.1 初始化1.2 插入操作1.3 完整代码 三. 滑动窗口中位数3.1 在第一题的基础上改造3.2 栈的remove操作 前言 想要精通算法和SQL的成长之路 - 系列导航 一. 大小根堆 先来说下大小根堆是什…

Qt 布局(QLayout 类QStackedWidget 类) 总结

一、QLayout类(基本布局) QLayout类是Qt框架中用于管理和排列QWidget控件的布局类。它提供了一种方便而灵活的方式来自动布局QWidget控件。QLayout类允许您以一种简单的方式指定如何安排控件&#xff0c;并能够自动处理控件的位置和大小&#xff0c;以使其适应更改的父窗口的大…

竞赛选题 深度学习OCR中文识别 - opencv python

文章目录 0 前言1 课题背景2 实现效果3 文本区域检测网络-CTPN4 文本识别网络-CRNN5 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; **基于深度学习OCR中文识别系统 ** 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;…

【LeetCode刷题(数据结构)】:另一颗树的子树

给你两棵二叉树 root 和 subRoot 检验 root 中是否包含和 subRoot 具有相同结构和节点值的子树。如果存在&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 二叉树 tree 的一棵子树包括 tree 的某个节点和这个节点的所有后代节点。tree 也可以看做它自身的一棵子…

数据库安全-H2 databaseElasticsearchCouchDBInfluxdb漏洞复现

目录 数据库安全-H2 database&Elasticsearch&CouchDB&Influxdb 复现influxdb-未授权访问-jwt 验证H2database-未授权访问-配置不当CouchDB-权限绕过配合 RCE-漏洞CouchDB 垂直权限绕过Couchdb 任意命令执行 RCE ElasticSearch-文件写入&RCE-漏洞Elasticsearch写…

CentOS 挂载新磁盘以及磁盘扩容操作教程

1.搭载新加磁盘 # 查看磁盘 fdisk -l #新盘&#xff08;/dev/sdb&#xff09;创建分区 #虚拟机 fdisk /dev/sdb #阿里云 fdisk /dev/vdb #创建/dev/sdb1为新的PV&#xff08;物理卷&#xff09; 【创建物理卷命令】 #虚拟机 pvcreate /dev/sdb1 #阿里云 pvcreate /dev/vdb1 查…

【C语言】通讯录的简单实现

通讯录的内容 contect.h #pragma once // 包含头文件 #include <stdio.h> #include <string.h> #include <assert.h> #include <stdlib.h>// 使用枚举常量定义功能 enum Function {quit, // 注意这是逗号&#xff0c;不是分号save,addition,delete,s…

27 mysql 组合索引 的存储以及使用

前言 这里来看一下 mysql 中索引的 增删改查 查询在前面的系列文章中都有使用到 这里 来看一下 增删改 的相关实现 索引记录 和 数据记录 的处理方式是一致的 这里来看一下 组合索引 的相关, 以及 特性 组合索引的存储以及使用 创建数据表如下, 除了主键之外, 创建了…