使用 Redis 如何实现延迟队列?

延迟消息队列在我们的日常工作中经常会被用到,比如支付系统中超过 30 分钟未支付的订单,将会被取消,这样就可以保证此商品库存可以释放给其他人购买,还有外卖系统如果商家超过 5 分钟未接单的订单,将会被自动取消,以此来保证用户可以更及时的吃到自己点的外卖,等等诸如此类的业务场景都需要使用到延迟消息队列,又因为它在业务中比较常见,因此这个知识点在面试中也会经常被问到。

我们本文的面试题是,使用 Redis 如何实现延迟消息队列?

典型回答

延迟消息队列的常见实现方式是通过 ZSet 的存储于查询来实现,它的核心思想是在程序中开启一个一直循环的延迟任务的检测器,用于检测和调用延迟任务的执行,如下图所示: image.png ZSet 实现延迟任务的方式有两种,第一种是利用 zrangebyscore 查询符合条件的所有待处理任务,循环执行队列任务;第二种实现方式是每次查询最早的一条消息,判断这条信息的执行时间是否小于等于此刻的时间,如果是则执行此任务,否则继续循环检测。

方式一:zrangebyscore 查询所有任务 此实现方式是一次性查询出所有的延迟任务,然后再进行执行,实现代码如下:

import redis.clients.jedis.Jedis;
import utils.JedisUtils;import java.time.Instant;
import java.util.Set;/*** 延迟队列*/
public class DelayQueueExample {// zset keyprivate static final String _KEY = "myDelayQueue";public static void main(String[] args) throws InterruptedException {Jedis jedis = JedisUtils.getJedis();// 延迟 30s 执行(30s 后的时间)long delayTime = Instant.now().plusSeconds(30).getEpochSecond();jedis.zadd(_KEY, delayTime, "order_1");// 继续添加测试数据jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");// 开启延迟队列doDelayQueue(jedis);}/*** 延迟队列消费* @param jedis Redis 客户端*/public static void doDelayQueue(Jedis jedis) throws InterruptedException {while (true) {// 当前时间Instant nowInstant = Instant.now();long lastSecond = nowInstant.plusSeconds(-1).getEpochSecond(); // 上一秒时间long nowSecond = nowInstant.getEpochSecond();// 查询当前时间的所有任务Set<String> data = jedis.zrangeByScore(_KEY, lastSecond, nowSecond);for (String item : data) {// 消费任务System.out.println("消费:" + item);}// 删除已经执行的任务jedis.zremrangeByScore(_KEY, lastSecond, nowSecond);Thread.sleep(1000); // 每秒轮询一次}}
}

以上程序执行结果如下:

消费:order2 消费:order3 消费:order4 消费:order5 消费:order_1

方式二:判断最早的任务 此实现方式是每次查询最早的一条任务,再与当前时间进行判断,如果任务执行时间大于当前时间则表示应该立即执行延迟任务,实现代码如下:

import redis.clients.jedis.Jedis;
import utils.JedisUtils;import java.time.Instant;
import java.util.Set;/*** 延迟队列*/
public class DelayQueueExample {// zset keyprivate static final String _KEY = "myDelayQueue";public static void main(String[] args) throws InterruptedException {Jedis jedis = JedisUtils.getJedis();// 延迟 30s 执行(30s 后的时间)long delayTime = Instant.now().plusSeconds(30).getEpochSecond();jedis.zadd(_KEY, delayTime, "order_1");// 继续添加测试数据jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");// 开启延迟队列doDelayQueue2(jedis);}/*** 延迟队列消费(方式 2)* @param jedis Redis 客户端*/public static void doDelayQueue2(Jedis jedis) throws InterruptedException {while (true) {// 当前时间long nowSecond = Instant.now().getEpochSecond();// 每次查询一条消息,判断此消息的执行时间Set<String> data = jedis.zrange(_KEY, 0, 0);if (data.size() == 1) {String firstValue = data.iterator().next();// 消息执行时间Double score = jedis.zscore(_KEY, firstValue);if (nowSecond >= score) {// 消费消息(业务功能处理)System.out.println("消费消息:" + firstValue);// 删除已经执行的任务jedis.zrem(_KEY, firstValue);}}Thread.sleep(100); // 执行间隔}}
}

以上程序执行结果和实现方式一相同,结果如下:

消费:order2 消费:order3 消费:order4 消费:order5 消费:order_1

其中,执行间隔代码 Thread.sleep(100) 可根据实际的业务情况删减或配置。

考点分析

延迟消息队列的实现方法有很多种,不同的公司可能使用的技术也是不同的,我上面是从 Redis 的角度出发来实现了延迟消息队列,但一般面试官不会就此罢休,会借着这个问题来问关于更多的延迟消息队列的实现方法,因此除了 Redis 实现延迟消息队列的方式,我们还需要具备一些其他的常见的延迟队列的实现方法。

和此知识点相关的面试题还有以下这些:

  • 使用 Java 语言如何实现一个延迟消息队列?
  • 你还知道哪些实现延迟消息队列的方法?

知识扩展

Java 中的延迟消息队列

我们可以使用 Java 语言中自带的 DelayQueue 数据类型来实现一个延迟消息队列,实现代码如下:

public class DelayTest {public static void main(String[] args) throws InterruptedException {DelayQueue delayQueue = new DelayQueue();delayQueue.put(new DelayElement(1000));delayQueue.put(new DelayElement(3000));delayQueue.put(new DelayElement(5000));System.out.println("开始时间:" +  DateFormat.getDateTimeInstance().format(new Date()));while (!delayQueue.isEmpty()){System.out.println(delayQueue.take());}System.out.println("结束时间:" +  DateFormat.getDateTimeInstance().format(new Date()));}static class DelayElement implements Delayed {// 延迟截止时间(单面:毫秒)long delayTime = System.currentTimeMillis();public DelayElement(long delayTime) {this.delayTime = (this.delayTime + delayTime);}@Override// 获取剩余时间public long getDelay(TimeUnit unit) {return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Override// 队列里元素的排序依据public int compareTo(Delayed o) {if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {return 1;} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {return -1;} else {return 0;}}@Overridepublic String toString() {return DateFormat.getDateTimeInstance().format(new Date(delayTime));}}
}

以上程序执行的结果如下:

开始时间:2019-6-13 20:40:38 2019-6-13 20:40:39 2019-6-13 20:40:41 2019-6-13 20:40:43 结束时间:2019-6-13 20:40:43

此实现方式的优点是开发比较方便,可以直接在代码中使用,实现代码也比较简单,但它缺点是数据保存在内存中,因此可能存在数据丢失的风险,最大的问题是它无法支持分布式系统。

使用 MQ 实现延迟消息队列

我们使用主流的 MQ 中间件也可以方便的实现延迟消息队列的功能,比如 RabbitMQ,我们可以通过它的 rabbitmq-delayed-message-exchange 插件来实现延迟队列。

首先我们需要配置并开启 rabbitmq-delayed-message-exchange 插件,然后再通过以下代码来实现延迟消息队列。

配置消息队列:

import com.example.rabbitmq.mq.DirectConfig;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;@Configuration
public class DelayedConfig {final static String QUEUE_NAME = "delayed.goods.order";final static String EXCHANGE_NAME = "delayedec";@Beanpublic Queue queue() {return new Queue(DelayedConfig.QUEUE_NAME);}// 配置默认的交换机@BeanCustomExchange customExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");//参数二为类型:必须是 x-delayed-messagereturn new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);}// 绑定队列到交换器@BeanBinding binding(Queue queue, CustomExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();}
}

发送者实现代码如下:

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;@Component
public class DelayedSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send(String msg) {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("发送时间:" + sf.format(new Date()));rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setHeader("x-delay", 3000);return message;}});}
}

从上述代码我们可以看出,我们配置 3s 之后再进行任务执行。

消费者实现代码如下:

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;@Component
@RabbitListener(queues = "delayed.goods.order")
public class DelayedReceiver {@RabbitHandlerpublic void process(String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("接收时间:" + sdf.format(new Date()));System.out.println("消息内容:" + msg);}
}

测试代码如下:

import com.example.rabbitmq.RabbitmqApplication;
import com.example.rabbitmq.mq.delayed.DelayedSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.text.SimpleDateFormat;
import java.util.Date;@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayedTest {@Autowiredprivate DelayedSender sender;@Testpublic void Test() throws InterruptedException {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");sender.send("Hi Admin.");Thread.sleep(5 * 1000); //等待接收程序执行之后,再退出测试}
}

以上程序的执行结果为:

发送时间:2020-06-11 20:47:51 接收时间:2018-06-11 20:47:54 消息内容:Hi Admin.

从上述结果中可以看出,当消息进入延迟队列 3s 之后才被正常消费,执行结果符合我的预期,RabbitMQ 成功的实现了延迟消息队列。

总结

本文我们讲了延迟消息队列的两种使用场景:支付系统中的超过 30 分钟未支付的订单,将会被自动取消,以此来保证此商品的库存可以正常释放给其他人购买,还有外卖系统如果商家超过 5 分钟未接单的订单,将会被自动取消,以此来保证用户可以更及时的吃到自己点的外卖。并且我们讲了延迟队列的 4 种实现方式,使用 ZSet 的 2 种实现方式,以及 Java 语言中的 DelayQueue 的实现方式,还有 RabbitMQ 的插件 rabbitmq-delayed-message-exchange 的实现方式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/545301.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第五章数据库完整性

第五章数据库完整性5.1_数据库完整性概述5.2_实体完整性5.2_参照完整性5.3_用户定义的完整性5.1_数据库完整性概述 1.数据库的完整性&#xff1a; 数据的正确性和相容性 2.数据的完整性和安全性的区别&#xff1a; 数据的完整性&#xff1a;防止数据库中存在不符合语义的数据&a…

KVC/KVO实现原理分析

2019独角兽企业重金招聘Python工程师标准>>> 1. 函数调用&#xff08;消息&#xff09;实现分析&#xff1a; 我们看这条语句&#xff1a; [代码]c#/cpp/oc代码&#xff1a; 1 [self.person setValue:"Vincent"forKey:"name"]; 就会被编译器…

小白学算法:买卖股票的最佳时机!

作者 | 王磊来源 | Java中文社群&#xff08;ID&#xff1a;javacn666&#xff09;转载请联系授权&#xff08;微信ID&#xff1a;GG_Stone&#xff09;本文已收录至 Github《小白学算法》系列&#xff1a;https://github.com/vipstone/algorith今天蚂蚁集团&#xff08;支付宝…

《APUE》第6章笔记

这一章主要介绍了口令文件和组文件的结构和一些围绕这些结构的函数。 口令文件即passwd就是在/etc/passwd中可以查阅。其结构是&#xff1a; 上图四个平台能支持的就用黑点表示。 因为加密口令这一项放在passwd这个人人可读的文件中&#xff0c;可能会有安全问题。所以现在的Li…

聊聊近期的感受和10月文章精选!

先来看本月的原创文章汇总&#xff0c;其中算法部分也有少量 9 月份的文章&#xff0c;这样汇总起来大家看起来更方便&#xff0c;目录如下。算法系列小白学算法第1篇&#xff1a;一文详解「栈」和手撸栈的两种方法&#xff01;小白学算法第2篇&#xff1a;JDK 竟然是这样实现栈…

第六章关系数据理论

第六章关系数据理论6.1_问题的提出&#xff08;略&#xff09;6.2_规范化6.2.1_函数依赖6.2.2_码6.2.3_范式6.2.4_2NF6.2.5_3NF6.2.6_BCNF6.2.7_多值依赖6.2.8_4NF6.2.9_规范化小结6.3_Armstrong公理系统6.3.1_函数依赖闭包6.3.2_最小依赖集6.3.3_转换为3NF的保持函数依赖的分解…

Github上的热门iOS开源项目:AFNetworking、MagicalRecord、BlocksKit以及XVim

1. AFNetworking AFNetworking是一个非常受欢迎的轻量级的iOS、Mac OS X网络通信类库。它建立在NSURLConnection、NSOperation以及其技术的基础上&#xff0c;有着精心设计的模块结构和功能丰富的API&#xff0c;让很多网络通信功能的实现变得十分简单。 附件&#xff1a;/c…

队列实现栈的3种方法,全都击败了100%的用户!

作者 | 王磊来源 | Java中文社群&#xff08;ID&#xff1a;javacn666&#xff09;转载请联系授权&#xff08;微信ID&#xff1a;GG_Stone&#xff09;本文已收录至 Github《小白学算法》系列&#xff1a;https://github.com/vipstone/algorith之前我们讲过《用两个栈实现一个…

where、having、group by、order by、count的使用注意

where、having、group by、order by、count的使用注意1_where、having、group by、order by的顺序2_group by的作用3_where和group by的组合4_group by和having的组合5_where、having、group by的组合使用6_count与group by的组合使用1_where、having、group by、order by的顺序…

很实用的21个SQL小技巧!

前言每一个好习惯都是一笔财富&#xff0c;本文基于MySQL&#xff0c;分SQL后悔药&#xff0c; SQL性能优化&#xff0c;SQL规范优雅三个方向&#xff0c;分享写SQL的21个好习惯&#xff0c;谢谢阅读&#xff0c;加油哈~1. 写完SQL先explain查看执行计划&#xff08;SQL性能优化…

阿里最喜欢问的多线程顺序打印的5种解法!

Keeper导读大家在换工作面试中&#xff0c;除了一些常规算法题&#xff0c;还会遇到各种需要手写的题目&#xff0c;所以打算总结出来&#xff0c;给大家个参考。全文 2929 字&#xff0c;剩下的是代码&#xff0c;P6 及以下阅读只需要 8 分钟&#xff0c;高 P 请直接关闭第一篇…

CSS入门

CSS入门1_CSS概要1.1_CSS引入方式2_CSS选择器3_字体样式3.1_字体属性3.2_字体类型&#xff1a;font-family3.3_字体大小&#xff1a;font-size3.4_字体粗细&#xff1a;font-weight3.5_字体颜色&#xff1a;color3.6_总结4_文本样式4.1_文本样式属性4.2_首行缩进&#xff1a;te…

23张图!万字详解「链表」,从小白到大佬!

作者 | 王磊来源 | Java中文社群&#xff08;ID&#xff1a;javacn666&#xff09;转载请联系授权&#xff08;微信ID&#xff1a;GG_Stone&#xff09;链表和数组是数据类型中两个重要又常用的基础数据类型。数组是连续存储在内存中的数据结构&#xff0c;因此它的优势是可以通…

何为“秒传”

写在前面 最近一直在弄文件传输的组件&#xff0c;在讨论组里面&#xff0c;有同事提到“秒传”的功能。在目前的通信软件中也有网盘的功能&#xff0c;就从网上搜了一下&#xff0c;这里对“秒传”的实现思路做一下总结&#xff0c;在之后会写一个小的demo实现一下。没有其他&…

数组转List的3种方法和使用对比!

作者 | 大脑补丁来源 | blog.csdn.net/x541211190/article/details/79597236前言&#xff1a;本文介绍Java中数组转为List三种情况的优劣对比&#xff0c;以及应用场景的对比&#xff0c;以及程序员常犯的类型转换错误原因解析。一.最常见方式&#xff08;未必最佳&#xff09;…

oracle10g备份导入

2019独角兽企业重金招聘Python工程师标准>>> //导出 exp test/testgdsoft filed:\gd.dmp //删用户 drop user wkwx cascade; //用PLSQL创建数据数据库 create user hzjzjn identified by hzjzjn default tablespace BDP_DATABD; grant CREATE USER,DROP USER,ALTE…

VisualSVNServer的使用

VisualSVNServer的使用1_服务端初识1.1_创建新仓库1.2_创建用户并分配权限1_服务端初识 1.1_创建新仓库 右击Repository&#xff0c;点击create 点击下一步 输入仓库名 右击空白处&#xff0c;点击新建&#xff0c;点击project structure 输入工程名 1.2_创建用户并分…

安利一个IDEA骚操作:一键生成方法的序列图

在平时的学习/工作中&#xff0c;我们会经常面临如下场景&#xff1a;阅读别人的代码阅读框架源码阅读自己很久之前写的代码。千万不要觉得工作就是单纯写代码&#xff0c;实际工作中&#xff0c;你会发现你的大部分时间实际都花在了阅读和理解已有代码上。为了能够更快更清晰地…

c#中textbox属性_C#.Net中带有示例的TextBox.Multiline属性

c#中textbox属性Here we are demonstrating use of Multiline property of the TextBox Control. 在这里&#xff0c;我们演示了TextBox控件的Multiline属性的使用。 Multiline property contains two values: 多行属性包含两个值&#xff1a; True: We can enter text in mo…

MATLAB新手教程

MATLAB新手教程 1&#xff0e;MATLAB的基本知识 1-1、基本运算与函数 在MATLAB下进行基本数学运算&#xff0c;仅仅需将运算式直接打入提示号&#xff08;>>&#xff09;之後&#xff0c;并按入Enter键就可以。比如&#xff1a; >> (5*21.3-0.8)*10/25 an…