RabbitMQ-核心特性

已经不需要为RabbitMQ交换机的离去而感到伤心了,接下来登场的是RabbitMQ-核心特性!!!

文章目录

  • 核心特性
    • 消息过期机制
    • 消息确认机制
    • 死信队列

核心特性

消息过期机制

官方文档:https://www.rabbitmq.com/ttl.html
可以给每条消息指定一个有效期,一段时间内未被消费者处理,就过期了
适用场景:清理过期数据

1)给队列中的所有消息指定过期时间

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 5000);
// args 指定参数
channel.queueDeclare(QUEUE_NAME, false, false, false, args);

如果在过期时间内,还没有消费者取消息,消息才会过期
注意,如果消息已经接收到,但是没确认,是不会过期的
消费者中给队列中所有消息设置过期时间:

public class TtlConsumer {private final static String QUEUE_NAME = "ttl_queue";public static void main(String[] argv) throws Exception {// 创建连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 创建队列,指定消息过期参数Map<String, Object> args = new HashMap<String, Object>();args.put("x-message-ttl", 5000);// args 指定参数channel.queueDeclare(QUEUE_NAME, false, false, false, args);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 定义了如何处理消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] Received '" + message + "'");};// 消费消息,会持续阻塞channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });}
}

2)给某条消息指定过期时间

// 给消息指定过期时间
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("1000").build();
channel.basicPublish("my-exchange", "routing-key", properties, message.getBytes(StandardCharsets.UTF_8));

生产者给某条消息指定过期时间

public class TtlProducer {private final static String QUEUE_NAME = "ttl_queue";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");
//        factory.setUsername();
//        factory.setPassword();
//        factory.setPort();// 建立连接、创建频道try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 发送消息String message = "Hello World!";// 给消息指定过期时间AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("1000").build();channel.basicPublish("my-exchange", "routing-key", properties, message.getBytes(StandardCharsets.UTF_8));System.out.println(" [x] Sent '" + message + "'");}}
}

消息确认机制

官方文档:https://www.rabbitmq.com/confirms.html
为保证消息成功被消费,rabbitmq提供了消息确认机制,当消费者收到消息后要给一个成功反馈:
●ack:消费成功
●nack:消费失败
●reject:拒绝
如果告诉 rabbitmq 服务器消费成功,服务器才会放心地移除消息。
支持配置 autoack,会自动执行 ack 命令,接收到消息立刻就成功了。
image.png

        channel.basicConsume(queueName, true, xiaoyuDeliverCallback, consumerTag -> {});

一般情况,建议 autoack 改为 false,根据实际情况,去手动确认。
指定确认某条消息:

image.png

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

第二个参数 multiple 批量确认:是指是否要一次性确认所有的历史消息直到当前这条消息
指定拒绝某条消息:
image.png

channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);

第 3 个参数表示是否重新入队,可用于重试

死信队列

官方文档:https://www.rabbitmq.com/dlx.html
为了保证消息的可靠性,比如每条消息都成功消费,需要提供一个容错机制,即:失败的消息怎么处理?
死信:过期的消息,拒收的消息,消息队列满了,处理失败的消息的统称
死信队列:专门处理死信的队列
死信交换机:专门给死信队列转发消息的交换机
示例场景:
image.png
实现:
1)创建死信交换机和死信队列,并且绑定关系
2)给失败之后需要容错处理的队列绑定死信交换机
3)可以给要容错的队列指定死信之后的转发规则,死信应该再转发到哪个死信队列
4)可以通过程序来读取死信队列中的消息,从而进行处理
生产者代码:

package com.yupi.springbootinit.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.util.Scanner;public class DlxDirectProducer {//死信交换机private static final String DEAD_EXCHANGE_NAME = "dlx-direct-exchange";//工作交换机private static final String WORK_EXCHANGE_NAME = "direct2-exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明死信交换机channel.exchangeDeclare(DEAD_EXCHANGE_NAME, "direct");// 创建laoban死信队列String queueName = "laoban_dlx_queue";channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, DEAD_EXCHANGE_NAME, "laoban");//创建waibao死信队列String queueName2 = "waibao_dlx_queue";channel.queueDeclare(queueName2, true, false, false, null);channel.queueBind(queueName2, DEAD_EXCHANGE_NAME, "waibao");DeliverCallback laobanDeliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");// 拒绝消息channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);System.out.println(" [laoban] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};DeliverCallback waibaoDeliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");// 拒绝消息channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);System.out.println(" [waibao] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, false, laobanDeliverCallback, consumerTag -> {});channel.basicConsume(queueName2, false, waibaoDeliverCallback, consumerTag -> {});Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String userInput = scanner.nextLine();String[] strings = userInput.split(" ");if (strings.length < 1) {continue;}String message = strings[0];String routingKey = strings[1];channel.basicPublish(WORK_EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + " with routing:" + routingKey + "'");}}}
}

消费者代码:

在这里插入代码片package com.yupi.springbootinit.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;public class DlxDirectConsumer {private static final String DEAD_EXCHANGE_NAME = "dlx-direct-exchange";private static final String WORK_EXCHANGE_NAME = "direct2-exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(WORK_EXCHANGE_NAME, "direct");//小狗的死信要转发到waibao这个死信队列// 指定死信队列参数Map<String, Object> args = new HashMap<>();// 要绑定到哪个交换机args.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);// 指定死信要转发到哪个死信队列args.put("x-dead-letter-routing-key", "waibao");// 创建队列,随机分配一个队列名称String queueName = "xiaodog_queue";channel.queueDeclare(queueName, true, false, false, args);channel.queueBind(queueName, WORK_EXCHANGE_NAME, "xiaodog");//小猫的死信要转发到laoban这个死信队列Map<String, Object> args2 = new HashMap<>();args2.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);args2.put("x-dead-letter-routing-key", "laoban");// 创建队列,随机分配一个队列名称String queueName2 = "xiaocat_queue";channel.queueDeclare(queueName2, true, false, false, args2);channel.queueBind(queueName2, WORK_EXCHANGE_NAME, "xiaocat");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback xiaoyuDeliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");// 拒绝消息,并且不要重新将消息放回队列,只拒绝当前消息channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);System.out.println(" [xiaodog] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};DeliverCallback xiaopiDeliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");// 拒绝消息channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);System.out.println(" [xiaocat] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, false, xiaoyuDeliverCallback, consumerTag -> {});channel.basicConsume(queueName2, false, xiaopiDeliverCallback, consumerTag -> {});}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/823472.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu 20.04.06 PCL C++学习记录(二十五)

[TOC]PCL中点云分割模块的学习 学习背景 参考书籍&#xff1a;《点云库PCL从入门到精通》以及官方代码PCL官方代码链接,&#xff0c;PCL版本为1.10.0&#xff0c;CMake版本为3.16&#xff0c;可用点云下载地址 学习内容 使用渐进形态滤波器分割识别地面回波&#xff0c;即执…

【一竞技CS2】VP战队官宣签下electroNic取代mir

1、近日VP战队官宣签下electroNic&#xff0c;以取代阵容中的mir。 electroNic自己也表示&#xff1a;“VP是一支顶级队伍。阵容核心曾赢得Major冠军&#xff0c;所有队员都处于巅峰状态并且时刻准备着去争夺冠军。我们有着一样的雄心壮志。 此外我还对和Jame很感兴趣&#xf…

C++从入门到精通——const与取地址重载

const与取地址重载 前言一、const正常用法const成员函数问题const对象可以调用非const成员函数吗非const对象可以调用const成员函数吗const成员函数内可以调用其它的非const成员函数吗非const成员函数内可以调用其它的const成员函数吗总结 二、取地址及const取地址操作符重载概…

如何用Jenkins执行自动化测试构建

摘要 依据Jenkins官网介绍&#xff0c;Jenkins是一个流行的开源持续集成和交付工具&#xff0c;它提供了一个可扩展的插件生态系统&#xff0c;可以用于自动化构建、测试和部署软件项目。 本文介绍如何安装使用Jenkins、常见问题解决方案以及深入应用&#xff0c;为自动化测试…

艾迪比皮具携手工博科技SAP ERP公有云,打造数字化转型新标杆

4月1日&#xff0c;广州市艾迪比皮具有限公司&#xff08;以下简称“艾迪比”&#xff09;SAP S/4HANA Cloud Public Edition&#xff08;以下简称“SAP ERP公有云”&#xff09;项目正式启动。双方项目组领导、成员出席本次项目启动会&#xff0c;为未来项目的顺利实施打下坚实…

滚雪球学Java(74):深入理解JavaSE输入输出流:掌握数据流动的奥秘

咦咦咦&#xff0c;各位小可爱&#xff0c;我是你们的好伙伴 bug菌&#xff0c;今天又来给大家手把手教学Java SE系列知识点啦&#xff0c;赶紧出来哇&#xff0c;别躲起来啊&#xff0c;听我讲干货记得点点赞&#xff0c;赞多了我就更有动力讲得更欢哦&#xff01;所以呀&…

nginx学习记录-动静分离

1. 动静分离原理 我们在访问网站资源的时候&#xff0c;通常会将资源分成两种&#xff0c;一种是静态资源&#xff08;前端的固定界面&#xff0c;比如图片&#xff0c;html页面等&#xff09;&#xff0c;这些资源无需后台程序处理&#xff1b;另一种是动态资源&#xff0c;这…

分布式调度器

xxl-job介绍 xxl-job 是一个轻量级分布式任务调度框架&#xff0c;支持动态添加、修改、删除定时任务&#xff0c;支持海量任务分片执行&#xff0c;支持任务执行日志在线查看和分页查询&#xff0c;同时支持任务失败告警和重试机制&#xff0c;支持分布式部署和高可用。xxl-j…

阿里云、腾讯云、华为云优惠券领取入口整理汇总

阿里云、腾讯云、华为云作为国内领先的云服务提供商&#xff0c;一直以其稳定、高效、安全的服务赢得了广大用户的青睐。为了回馈用户&#xff0c;这些云平台经常会推出各种优惠活动&#xff0c;其中最为常见的便是优惠券。本文将为大家整理汇总阿里云、腾讯云、华为云优惠券的…

linux-centos虚拟机设置固定ip

环境准备 虚拟机版本&#xff1a;centos7 安装环境&#xff1a;vmware17 1、设置网络连接 虚拟机-设置-网络适配器-NAT模式 2、查看子网信息 编辑-虚拟网络编辑器-NAT模式-NAT设置 查看子网ip和网关ip 下一步要用 3、修改配置文件 vim /etc/sysconfig/network-scripts…

构建数据平台架构指导原则与平台核心组件说明

文章目录 前言什么是数据架构&#xff1f;数据架构如何帮助构建数据平台&#xff1f;数据平台核心组件数据源系统数据加载数据存储数据处理和转换提供使用数据的方式公共服务 前言 湖仓一体是最近几年非常流行的现代大数据架构&#xff0c;目前它已经成为设计数据平台架构的首…

Flask框架初探-如何在本机发布一个web服务并通过requests访问自己发布的服务-简易入门版

Flask框架初探 在接触到网络框架之前我其实一直对一个事情有疑惑&#xff0c;跨语言的API在需要传参的情况下究竟应该如何调用&#xff0c;之前做过的项目里&#xff0c;我用python做了一个代码使用一个算法得到一个结果之后我应该怎么给到做前端的同学或者同事&#xff0c;之前…

基于springboot 的医院信管系统

开发语言&#xff1a;Java 框架&#xff1a;springboot JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09; 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myeclipse/idea Maven…

SpringBoot新增菜品模块开发(事务管理+批量插入+主键回填)

需求分析与设计 一&#xff1a;产品原型 后台系统中可以管理菜品信息&#xff0c;通过 新增功能来添加一个新的菜品&#xff0c;在添加菜品时需要选择当前菜品所属的菜品分类&#xff0c;并且需要上传菜品图片。 新增菜品原型&#xff1a; 当填写完表单信息, 点击"保存…

【数据分析】AHP层次分析法

博主总结&#xff1a;根据每个方案x各准则因素权重累加结果 对比来选择目标。数据主观性强 简介 AHP层次分析法是一种解决多目标复杂问题的定性和定量相结合进行计算决策权重的研究方法。该方法将定量分析与定性分析结合起来&#xff0c;用决策者的经验判断各衡量目标之间能…

【数字人】AIGC技术引领数字人革命:从制作到应用到全景解析

AIGC技术引领虚拟数字人革命&#xff1a;从制作到应用的全景解析 一、AIGC技术为虚拟数字人注入智能灵魂二、AIGC型虚拟人制作流程实例分析1、采集数据2、建模3、内容生成 三、AIGC在虚拟数字人应用中的案例分析四、总结与展望 在科技的浪潮中&#xff0c;AIGC&#xff08;人工…

Ubuntu:VSCode中编译运行C++代码

版本&#xff1a;Ubuntu22.04.1 LTS 目录 1 安装VSCode并汉化 2 检查Ubuntu是否已经安装了 GCC 3 在VScode中安装C/C扩展 4 在VSCode中进行C/C配置 1 安装VSCode并汉化 安装VSCode&#xff08;参考之前博客Ubuntu&#xff1a;安装VSCode_ubuntu vscode-CSDN博客&#xff…

Linux sort/uniq/wc

文章目录 1. sort 排序将线程ID从大到小排序 2.uniq 临近去重3.wc word cnt 统计 1. sort 排序 将线程ID从大到小排序 grep -v是反向筛选&#xff0c;利用USER&#xff0c;排除掉首行 awk是打印第1 2列 sort -n是代码以数值大小做排序&#xff0c;不加的话会以字符排序。 -k是…

Gitee和Git学习笔记

Gitee和Git指令 Gitee提交代码方法1 先将仓库clone到本地&#xff0c;修改后再push到 Gitee 的仓库方法2 本地初始化一个仓库&#xff0c;设置远程仓库地址后再做push 切换分支下载代码通过git clone克隆仓库通过下载 ZIP 的方式下载代码 Git提交指令 解决本地库同时关联GitHub…

(C语言入门)复合类型、内存管理

目录 复合类型&#xff08;自定义类型&#xff09; 概述&#xff1a; 结构体变量的定义和初始化&#xff1a; 结构体成员的使用&#xff1a; 结构体做函数参数&#xff1a; 结构体值传参&#xff1a; 结构体地址传参&#xff1a; 共用体&#xff08;联合体&#xff09;&…