RabbitMQ - 简单案例

目录

0.引用

1.Hello world

2.轮训分发消息

  2.1 抽取工具类

  2.2 启动两个工作线程接受消息

  2.4 结果展示

3.消息应答

  3.1 自动应答

  3.2 手动消息应答的方法

   3.3 消息自动重新入队

  3.4 消息手动应答代码

4.RabbitMQ 持久化

  4.1 队列如何实现持久化

  4.2 消息实现持久化

 5.不公平分发

6.预取值分发


0.引用

https://note.oddfar.com/rabbitmq/

1.Hello world

  1.1 依赖引用

<dependencies><!--rabbitmq 依赖客户端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><!--操作文件流的一个依赖--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency>
</dependencies>

  1.2 消息生产者

package com.example.one;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {private final static String QUEUE_NAME = "quque";public static void main(String[] args) throws Exception {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.2.17");factory.setUsername("admin");factory.setPassword("admin");//channel 实现了自动 close 接口 自动关闭 不需要显示关闭//创建连接Connection connection = factory.newConnection();//获取信道Channel channel = connection.createChannel();/*** 生成一个队列* 1.QUEUE_NAME 队列名称* 2.durable 队列里面的消息是否持久化 也就是是否用完就删除* 3.exclusive 该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费* 4.autoDelete是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除* 5.其他参数*/Boolean durable = true;Boolean exclusive = false;Boolean autoDelete = false;Map<String, Object> arguments = null;channel.queueDeclare(QUEUE_NAME,durable,exclusive,autoDelete, null);String message = "hello world";/*** 发送一个消息* 1.发送到那个交换机* 2.路由的 key 是哪个* 3.其他的参数信息* 4.发送消息的消息体*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("消息发送完毕");}}

  1.3 消息消费者

package com.example.one;
import com.rabbitmq.client.*;public class Consumer {private final static String QUEUE_NAME = "quque";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.2.17");factory.setUsername("admin");factory.setPassword("admin");Connection connection = factory.newConnection();Channel channel = connection.createChannel();System.out.println("等待接收消息.........");//推送的消息如何进行消费的接口回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());System.out.println(message);};//取消消费的一个回调接口 如在消费的时候队列被删除掉了CancelCallback cancelCallback = (consumerTag) -> {System.out.println("消息消费被中断");};/*** 消费者消费消息 - 接受消息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答* 3.消费者未成功消费的回调* 4.消息被取消时的回调*/channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}}

2.轮训分发消息

  2.1 抽取工具类

package com.example.utils;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMqUtils {//得到一个连接的 channelpublic static Channel getChannel() throws Exception {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.2.17");factory.setUsername("admin");factory.setPassword("admin");Connection connection = factory.newConnection();Channel channel = connection.createChannel();return channel;}
}

  2.2 启动两个工作线程接受消息

package com.example.two;import com.oddfar.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Worker01 {private static final String QUEUE_NAME = "quque";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//消息接受DeliverCallback deliverCallback = (consumerTag, delivery) -> {String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:" + receivedMessage);};//消息被取消CancelCallback cancelCallback = (consumerTag) -> {System.out.println(consumerTag + "消费者取消消费接口回调逻辑");};System.out.println("C1 消费者启动等待消费.................. ");channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}
}

选中 Allow multiple instances

image-20210627125840217

 启动后

image-20210627130146584

   2.3 启动一个发送消息线程

public class Task01 {public static final String QUEUE_NAME = "quque";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = scanner.next();channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("消息发送完成:" + message);}}
}

  2.4 结果展示

        通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且是按照有序的一个接收一次消息

3.消息应答

  3.1 自动应答

        消息发送后立即被认为已经传送成功

  3.2 手动消息应答的方法

  • Channel.basicAck(用于肯定确认)
  • Channel.basicNack(用于否定确认)
  • Channel.basicReject(用于否定确认)

Multiple 的解释:

        手动应答的好处是可以批量应答并且减少网络拥堵

  •  true 代表批量应答 channel 上未应答的消息
  • false 同上面相比只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

RabbitMQ-00000018

   3.3 消息自动重新入队

  3.4 消息手动应答代码

        消费者在上面代码的基础上增加了以下内容

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

4.RabbitMQ 持久化

  4.1 队列如何实现持久化

//让队列持久化
boolean durable = true;
//声明队列
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);

  4.2 消息实现持久化

        需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添加这个属性

RabbitMQ-00000028

 5.不公平分发

  为了避免这种情况,在消费者中消费之前,我们可以设置参数 channel.basicQos(1);

//不公平分发
int prefetchCount = 1;
channel.basicQos(prefetchCount);//采用手动应答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

6.预取值分发

        本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息另外来自消费 者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos 方法设 置“预取计数”值来完成的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/28142.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

7.1 动手实现AlexNet

AlexNet引入了dropput层 代码 import torch from torch import nn from d2l import torch as d2lnet nn.Sequential(# 样本数为1,通道数为96,11x11的卷积核,步幅为4&#xff0c;减少输出的高度和深度。 LeNet的通道数才6&#xff0c;此处96&#xff0c;为什么要增加这么多通…

MIT 6.830数据库系统 -- lab six

MIT 6.830数据库系统 -- lab six 项目拉取引言steal/no-force策略redo log与undo log日志格式和检查点 开始回滚练习1&#xff1a;LogFile.rollback() 恢复练习2&#xff1a;LogFile.recover() 测试结果疑问点分析 项目拉取 原项目使用ant进行项目构建&#xff0c;我已经更改为…

微服务技术栈(1.0)

微服务技术栈 认识微服务 单体架构 单体架构&#xff1a;将业务的所有功能集中在一个项目中开发&#xff0c;打成一个包部署 优点&#xff1a; 架构简单部署成本低 缺点&#xff1a; 耦合度高 分布式架构 分布式架构&#xff1a;根据业务功能对系统进行拆分&#xff0c…

如何在 Spring Boot 中集成日志框架 SLF4J、Log4j

文章目录 具体步骤附录 笔者的操作环境&#xff1a; Spring Cloud Alibaba&#xff1a;2022.0.0.0-RC2 Spring Cloud&#xff1a;2022.0.0 Spring Boot&#xff1a;3.0.2 Nacos 2.2.3 Maven 3.8.3 JDK 17.0.7 IntelliJ IDEA 2022.3.1 (Ultimate Edition) 具体步骤 因为 …

Java课题笔记~ 使用 Spring 的事务注解管理事务(掌握)

通过Transactional 注解方式&#xff0c;可将事务织入到相应 public 方法中&#xff0c;实现事务管理。 Transactional 的所有可选属性如下所示&#xff1a; propagation&#xff1a;用于设置事务传播属性。该属性类型为 Propagation 枚举&#xff0c; 默认值为 Propagation.R…

ESP32 Max30102 (3)修复心率误差

1. 运行效果 2. 新建修复心率误差.py 代码如下: from machine import sleep, SoftI2C, Pin, Timer from utime import ticks_diff, ticks_us from max30102 import MAX30102, MAX30105_PULSE_AMP_MEDIUM from hrcalc import calc_hr_and_spo2BEATS = 0 # 存储心率 FINGER_F…

如何识别手机是否有灵动岛(dynamic island)

如何识别手机是否有灵动岛&#xff08;dynamic island&#xff09; 灵动岛是苹果2022年9月推出的iPhone 14 Pro、iPhone 14 Pro Max首次出现&#xff0c;操作系统最低是iOS16.0。带灵动岛的手机在竖屏时顶部工具栏大于等于51像素。 #define isHaveDynamicIsland ({ BOOL isH…

微信小程序的项目解构

视频链接 黑马程序员前端微信小程序开发教程&#xff0c;微信小程序从基础到发布全流程_企业级商城实战(含uni-app项目多端部署)_哔哩哔哩_bilibili 接口文档 https://www.escook.cn/docs-uni-shop/mds/1.start.html 1&#xff1a;微信小程序宿主环境 1&#xff1a;常见的宿…

安达发制造工业迈向智能化:APS高级计划排程助力提升生产效率

随着市场竞争的加剧&#xff0c;制造企业纷纷寻求提高生产效率和降低成本的方法。近年来&#xff0c;越来越多的制造企业开始采用APS(高级计划与排程)系统&#xff0c;以优化生产计划和排程&#xff0c;提高生产效率&#xff0c;并在竞争中取得优势。 现代制造业通常面临复杂的…

【第一阶段】kotlin的range表达式

range:范围&#xff1a;从哪里到哪里的意思 in:表示在 !in&#xff1a;表示不在 … :表示range表达式 代码示例&#xff1a; fun main() {var num:Int20if(num in 0..9){println("差劲")}else if(num in 10..59){println("不及格")}else if(num in 60..89…

c语言每日一练(3)

前言&#xff1a;每日一练系列&#xff0c;每一期都包含5道选择题&#xff0c;2道编程题&#xff0c;博主会尽可能详细地进行讲解&#xff0c;令初学者也能听的清晰。每日一练系列会持续更新&#xff0c;暑假时三天之内必有一更&#xff0c;到了开学之后&#xff0c;将看学业情…

MySQL数据库的操作

MySQL 连接服务器 库的操作创建数据库数据库删除查看数据库进入数据库查看所在的数据库修改数据库显示创建语句查看连接情况 表的操作创建表查看数据库所有的表查看表的详细信息查看创建表时的详细信息删除表修改表名向表中插入数据在表结构中新增一列对表结构数据的修改删除表…

std::string 的append方法 存放文本和非文本数据

今天在用std::string来拼接数据 有文本数据 也有 非文本数据 如果是文本数据那么append方法参数为 ( char *data, int len&#xff09; 将data的前len个字节附加到 string中 如果是非文本数据 则参数为&#xff08;int size, char data&#xff09;; 重复size个data 附加…

【技巧】如何保护PowerPoint不被改动?

PPT&#xff0c;也就是PowerPoint&#xff0c;是很多小伙伴在工作生活中经常用到的图形演示文稿软件。 做好PPT后&#xff0c;担心自己不小心改动了或者不想他人随意更改&#xff0c;我们可以如何保护PPT呢&#xff1f;下面小编就来分享两个常用的方法&#xff1a; 1. 将PPT改…

STM32 4G学习(二)

特性参数 ATK-IDM750C是正点原子开发的一款高性能4G Cat1 DTU产品&#xff0c;支持移动4G、联通4G和电信4G手机卡。 它以高速率、低延迟和无线数传作为核心功能&#xff0c;可快速解决应用场景下的无线数传方案。 它支持TCP/UDP/HTTP/MQTT/DNS/RNDIS/NTP协议&#xff0c;支持…

ASCP系列电气防火限流式保护器在养老院的应用-安科瑞黄安南

摘要&#xff1a;2020年&#xff0c;我国65岁及以上老年人口数量为1.91亿&#xff0c;老龄化率达到13.5%。总体来看&#xff0c;大部分省市的养老机构数量还较少。养老设施的建设与民生息息相关&#xff0c;养老院的电气安全也非常重要。如果发生电气火灾&#xff0c;对于行动不…

lab7 proxylab

前情提要&#xff0c;如果看了书本&#xff0c;这个lab难度不高&#xff0c;但是如果不看书&#xff0c;难度还是挺高的&#xff0c;并且这个lab会用到cachelab中学到的东西&#xff0c;需要阅读 第十章&#xff1a;系统编程第十一章&#xff1a;网络编程第十二章&#xff1a;…

Licheepi Nano屏幕驱动并输出打印信息

Licheepi Nano买回来好长时间&#xff0c;没咋玩&#xff0c;最近看了一个利用F1C100S自制迷你电脑的博客&#xff0c;里面主要参考的就是Licheepi Nano。我打算先在Licheepi Nano上完成屏幕操作、Debian文件系统和USB键盘等内容&#xff0c;这里介绍怎样利用Licheepi Nano外接…

Oracle单实例升级补丁

目录 1.当前DB环境2.下载补丁包和opatch的升级包3.检查OPatch的版本4.检查补丁是否冲突5.关闭数据库实例&#xff0c;关闭监听6.应用patch7.加载变化的SQL到数据库8.ORACLE升级补丁查询 oracle19.3升级补丁到19.18 1.当前DB环境 [oraclelocalhost ~]$ cat /etc/redhat-releas…

记录--说一说css的font-size: 0

这里给大家分享我在网上总结出来的一些知识&#xff0c;希望对大家有所帮助 平常我们说的font-size&#xff1a;0&#xff1b;就是设置字体大小为0对吧&#xff0c;但是它的用处不仅仅如此哦&#xff0c;它还可以消除子行内元素间额外多余的空白&#xff01; 问题描述&#xff…