使用 Redis 如何实现延迟队列?

延迟消息队列在我们的日常工作中经常会被用到,比如支付系统中超过 30 分钟未支付的订单,将会被取消,这样就可以保证此商品库存可以释放给其他人购买,还有外卖系统如果商家超过 5 分钟未接单的订单,将会被自动取消,以此来保证用户可以更及时的吃到自己点的外卖,等等诸如此类的业务场景都需要使用到延迟消息队列,又因为它在业务中比较常见,因此这个知识点在面试中也会经常被问到。

我们本文的面试题是,使用 Redis 如何实现延迟消息队列?

典型回答

延迟消息队列的常见实现方式是通过 ZSet 的存储于查询来实现,它的核心思想是在程序中开启一个一直循环的延迟任务的检测器,用于检测和调用延迟任务的执行,如下图所示: image.png ZSet 实现延迟任务的方式有两种,第一种是利用 zrangebyscore 查询符合条件的所有待处理任务,循环执行队列任务;第二种实现方式是每次查询最早的一条消息,判断这条信息的执行时间是否小于等于此刻的时间,如果是则执行此任务,否则继续循环检测。

方式一:zrangebyscore 查询所有任务 此实现方式是一次性查询出所有的延迟任务,然后再进行执行,实现代码如下:

import redis.clients.jedis.Jedis;
import utils.JedisUtils;import java.time.Instant;
import java.util.Set;/*** 延迟队列*/
public class DelayQueueExample {// zset keyprivate static final String _KEY = "myDelayQueue";public static void main(String[] args) throws InterruptedException {Jedis jedis = JedisUtils.getJedis();// 延迟 30s 执行(30s 后的时间)long delayTime = Instant.now().plusSeconds(30).getEpochSecond();jedis.zadd(_KEY, delayTime, "order_1");// 继续添加测试数据jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");// 开启延迟队列doDelayQueue(jedis);}/*** 延迟队列消费* @param jedis Redis 客户端*/public static void doDelayQueue(Jedis jedis) throws InterruptedException {while (true) {// 当前时间Instant nowInstant = Instant.now();long lastSecond = nowInstant.plusSeconds(-1).getEpochSecond(); // 上一秒时间long nowSecond = nowInstant.getEpochSecond();// 查询当前时间的所有任务Set<String> data = jedis.zrangeByScore(_KEY, lastSecond, nowSecond);for (String item : data) {// 消费任务System.out.println("消费:" + item);}// 删除已经执行的任务jedis.zremrangeByScore(_KEY, lastSecond, nowSecond);Thread.sleep(1000); // 每秒轮询一次}}
}

以上程序执行结果如下:

消费:order2 消费:order3 消费:order4 消费:order5 消费:order_1

方式二:判断最早的任务 此实现方式是每次查询最早的一条任务,再与当前时间进行判断,如果任务执行时间大于当前时间则表示应该立即执行延迟任务,实现代码如下:

import redis.clients.jedis.Jedis;
import utils.JedisUtils;import java.time.Instant;
import java.util.Set;/*** 延迟队列*/
public class DelayQueueExample {// zset keyprivate static final String _KEY = "myDelayQueue";public static void main(String[] args) throws InterruptedException {Jedis jedis = JedisUtils.getJedis();// 延迟 30s 执行(30s 后的时间)long delayTime = Instant.now().plusSeconds(30).getEpochSecond();jedis.zadd(_KEY, delayTime, "order_1");// 继续添加测试数据jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");// 开启延迟队列doDelayQueue2(jedis);}/*** 延迟队列消费(方式 2)* @param jedis Redis 客户端*/public static void doDelayQueue2(Jedis jedis) throws InterruptedException {while (true) {// 当前时间long nowSecond = Instant.now().getEpochSecond();// 每次查询一条消息,判断此消息的执行时间Set<String> data = jedis.zrange(_KEY, 0, 0);if (data.size() == 1) {String firstValue = data.iterator().next();// 消息执行时间Double score = jedis.zscore(_KEY, firstValue);if (nowSecond >= score) {// 消费消息(业务功能处理)System.out.println("消费消息:" + firstValue);// 删除已经执行的任务jedis.zrem(_KEY, firstValue);}}Thread.sleep(100); // 执行间隔}}
}

以上程序执行结果和实现方式一相同,结果如下:

消费:order2 消费:order3 消费:order4 消费:order5 消费:order_1

其中,执行间隔代码 Thread.sleep(100) 可根据实际的业务情况删减或配置。

考点分析

延迟消息队列的实现方法有很多种,不同的公司可能使用的技术也是不同的,我上面是从 Redis 的角度出发来实现了延迟消息队列,但一般面试官不会就此罢休,会借着这个问题来问关于更多的延迟消息队列的实现方法,因此除了 Redis 实现延迟消息队列的方式,我们还需要具备一些其他的常见的延迟队列的实现方法。

和此知识点相关的面试题还有以下这些:

  • 使用 Java 语言如何实现一个延迟消息队列?
  • 你还知道哪些实现延迟消息队列的方法?

知识扩展

Java 中的延迟消息队列

我们可以使用 Java 语言中自带的 DelayQueue 数据类型来实现一个延迟消息队列,实现代码如下:

public class DelayTest {public static void main(String[] args) throws InterruptedException {DelayQueue delayQueue = new DelayQueue();delayQueue.put(new DelayElement(1000));delayQueue.put(new DelayElement(3000));delayQueue.put(new DelayElement(5000));System.out.println("开始时间:" +  DateFormat.getDateTimeInstance().format(new Date()));while (!delayQueue.isEmpty()){System.out.println(delayQueue.take());}System.out.println("结束时间:" +  DateFormat.getDateTimeInstance().format(new Date()));}static class DelayElement implements Delayed {// 延迟截止时间(单面:毫秒)long delayTime = System.currentTimeMillis();public DelayElement(long delayTime) {this.delayTime = (this.delayTime + delayTime);}@Override// 获取剩余时间public long getDelay(TimeUnit unit) {return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Override// 队列里元素的排序依据public int compareTo(Delayed o) {if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {return 1;} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {return -1;} else {return 0;}}@Overridepublic String toString() {return DateFormat.getDateTimeInstance().format(new Date(delayTime));}}
}

以上程序执行的结果如下:

开始时间:2019-6-13 20:40:38 2019-6-13 20:40:39 2019-6-13 20:40:41 2019-6-13 20:40:43 结束时间:2019-6-13 20:40:43

此实现方式的优点是开发比较方便,可以直接在代码中使用,实现代码也比较简单,但它缺点是数据保存在内存中,因此可能存在数据丢失的风险,最大的问题是它无法支持分布式系统。

使用 MQ 实现延迟消息队列

我们使用主流的 MQ 中间件也可以方便的实现延迟消息队列的功能,比如 RabbitMQ,我们可以通过它的 rabbitmq-delayed-message-exchange 插件来实现延迟队列。

首先我们需要配置并开启 rabbitmq-delayed-message-exchange 插件,然后再通过以下代码来实现延迟消息队列。

配置消息队列:

import com.example.rabbitmq.mq.DirectConfig;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;@Configuration
public class DelayedConfig {final static String QUEUE_NAME = "delayed.goods.order";final static String EXCHANGE_NAME = "delayedec";@Beanpublic Queue queue() {return new Queue(DelayedConfig.QUEUE_NAME);}// 配置默认的交换机@BeanCustomExchange customExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");//参数二为类型:必须是 x-delayed-messagereturn new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);}// 绑定队列到交换器@BeanBinding binding(Queue queue, CustomExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();}
}

发送者实现代码如下:

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;@Component
public class DelayedSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send(String msg) {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("发送时间:" + sf.format(new Date()));rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setHeader("x-delay", 3000);return message;}});}
}

从上述代码我们可以看出,我们配置 3s 之后再进行任务执行。

消费者实现代码如下:

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;@Component
@RabbitListener(queues = "delayed.goods.order")
public class DelayedReceiver {@RabbitHandlerpublic void process(String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("接收时间:" + sdf.format(new Date()));System.out.println("消息内容:" + msg);}
}

测试代码如下:

import com.example.rabbitmq.RabbitmqApplication;
import com.example.rabbitmq.mq.delayed.DelayedSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.text.SimpleDateFormat;
import java.util.Date;@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayedTest {@Autowiredprivate DelayedSender sender;@Testpublic void Test() throws InterruptedException {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");sender.send("Hi Admin.");Thread.sleep(5 * 1000); //等待接收程序执行之后,再退出测试}
}

以上程序的执行结果为:

发送时间:2020-06-11 20:47:51 接收时间:2018-06-11 20:47:54 消息内容:Hi Admin.

从上述结果中可以看出,当消息进入延迟队列 3s 之后才被正常消费,执行结果符合我的预期,RabbitMQ 成功的实现了延迟消息队列。

总结

本文我们讲了延迟消息队列的两种使用场景:支付系统中的超过 30 分钟未支付的订单,将会被自动取消,以此来保证此商品的库存可以正常释放给其他人购买,还有外卖系统如果商家超过 5 分钟未接单的订单,将会被自动取消,以此来保证用户可以更及时的吃到自己点的外卖。并且我们讲了延迟队列的 4 种实现方式,使用 ZSet 的 2 种实现方式,以及 Java 语言中的 DelayQueue 的实现方式,还有 RabbitMQ 的插件 rabbitmq-delayed-message-exchange 的实现方式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/545301.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

适用响应式 Web UI 框架

1. BootstrapBootstrap是快速开发Web应用程序的前端工具包。它是一个CSS和HTML的集合&#xff0c;它使用了最新的浏览器技术&#xff0c;给你的Web开发提供了时尚的版式&#xff0c;表单&#xff0c;buttons&#xff0c;表格&#xff0c;网格系统等等。官方网站: http://twitte…

VC 忽略警告的方法

在vs2003, vs2005中用sprintf 会出现warning C4996: sprintf was declared deprecated或warning C4996: strcpy was declared deprecated或warning C4996: strcat was declared deprecated的警告。这里给出解决问题的一些方法。方法一&#xff1a;调用VS2005鼓吹的那些带“_s”…

第四章数据库安全性

第四章数据库安全性4.1_自主存取控制方法4.1.1_授权与回收4.2_数据库角色4.2.1_角色的创建4.2.2_给角色授权4.2.3_将一个角色授予其他的角色或用户4.2.4_角色权限的收回4.1_自主存取控制方法 4.1.1_授权与回收 1.GRANT: GRANT <权限>[,<权限>]… [ON <对象类型…

observable_Java Observable notifyObservers()方法与示例

observable可观察的类notifyObservers()方法 (Observable Class notifyObservers() method) Syntax: 句法&#xff1a; public void notifyObservers();public void notifyObservers(Object o);notifyObservers() method is available in java.util package. notifyObservers(…

Redis 面试题补充与汇总

前面的 12 个章节对 Redis 的面试题做了一个系统的讲解,那么本文将对 Redis 的热门面试题再做一个补充,力求覆盖到更多的 Redis 面试点。 Redis 持久化 Redis 持久化总共有以下三种方式: 快照方式(RDB, Redis DataBase)将某一个时刻的内存数据,以二进制的方式写入磁盘;…

第五章数据库完整性

第五章数据库完整性5.1_数据库完整性概述5.2_实体完整性5.2_参照完整性5.3_用户定义的完整性5.1_数据库完整性概述 1.数据库的完整性&#xff1a; 数据的正确性和相容性 2.数据的完整性和安全性的区别&#xff1a; 数据的完整性&#xff1a;防止数据库中存在不符合语义的数据&a…

KVC/KVO实现原理分析

2019独角兽企业重金招聘Python工程师标准>>> 1. 函数调用&#xff08;消息&#xff09;实现分析&#xff1a; 我们看这条语句&#xff1a; [代码]c#/cpp/oc代码&#xff1a; 1 [self.person setValue:"Vincent"forKey:"name"]; 就会被编译器…

英语笔记:写作:Nothing succeeds without a strong will

Nothing succeeds without a strong will 没有坚强的意志将一事无成 There is a widespread humorous saying that “Quitting smoking is theeasiest thing in the world. I’ve done it for hundreds of times.” 1. Funny as itis, the saying ironically reflects the fac…

filterreader_Java FilterReader markSupported()方法与示例

filterreaderFilterReader类markSupported()方法 (FilterReader Class markSupported() method) markSupported() method is available in java.io package. markSupported()方法在java.io包中可用。 markSupported() method is used to check whether this FilterReader strea…

如何设计不宕机的 Redis 高可用服务?

随着业务的不断发展和扩张我们需要更加稳定和高效的 Redis 服务,这是业务发展的必然趋势也是个人能力进阶的最高境界,我们需要一个高可用的 Redis 服务,来支撑和保证业务的正常运行。 我们本文的面试题是,如何设计一个不宕机的 Redis 高可用服务? 典型回答 想要设计一个…

第十章数据库恢复技术

第十章数据库恢复技术 10.1_事务 事务的四个特性&#xff1a;原子性、一致性、隔离性、持续性 10.2_故障的种类 事务内部的故障系统故障介质故障计算机病毒 10.3_恢复的实现技术 转储&#xff1a;动态和静态登记日志文件 10.4_恢复策略 事务故障的恢复&#xff1a;直接…

【SICP练习】22 练习1.28

&#xfeff;&#xfeff;练习1.28 这道题主要分为三个部分&#xff1a; 1、非平凡平方根&#xff0c;并添加到expmod函数中 2、类似于fermat-test的过程 3、通过已知的素数和非素数来检验 下面我们首先来写出能够在遇到非平凡平方根的时候报错的函数&#xff0c;在这个函数中&…

Oracle笔记:创建表空间、创建用户、授权

--回顾 --1、表空间 create tablespace ts datafile d:\123.dbf size 10m autoextend oncreate temporary tablespace temp_ts tempfile d:\temp123.dbf size 5m autoextend on--2、用户 create user testuser identified by test123 default tablespace ts temporary tablespa…

Java类类getClassLoader()方法及示例

类的类getClassLoader()方法 (Class class getClassLoader() method) getClassLoader() method is available in java.lang package. getClassLoader()方法在java.lang包中可用。 getClassLoader() method is used to return the ClassLoader that loads the class or interfac…

小白学算法:买卖股票的最佳时机!

作者 | 王磊来源 | Java中文社群&#xff08;ID&#xff1a;javacn666&#xff09;转载请联系授权&#xff08;微信ID&#xff1a;GG_Stone&#xff09;本文已收录至 Github《小白学算法》系列&#xff1a;https://github.com/vipstone/algorith今天蚂蚁集团&#xff08;支付宝…

第十一章并发控制

第十一章并发控制11.1_并发控制概述11.2_封锁11.1_并发控制概述 事务是并发控制的基本单位 并发操作带来的数据不一致性包括丢失修改、不可重复读、读“赃”数据 11.2_封锁 两种封锁协议&#xff1a;排他锁&#xff08;写锁&#xff09;和共享锁&#xff08;读锁&#xff09…

《APUE》第6章笔记

这一章主要介绍了口令文件和组文件的结构和一些围绕这些结构的函数。 口令文件即passwd就是在/etc/passwd中可以查阅。其结构是&#xff1a; 上图四个平台能支持的就用黑点表示。 因为加密口令这一项放在passwd这个人人可读的文件中&#xff0c;可能会有安全问题。所以现在的Li…

Oracle笔记:备份还原

--------------备份还原-------------------- --物理备份 --逻辑备份 1&#xff09;传统工具&#xff1a; exp导出、imp导入 实质&#xff1a;调用sql指令&#xff0c;导入\导出数据&#xff0c;速度较慢 可以运行在客户端&#xff0c;也可以运行在服务器端&#xff0c; 在cmd窗…

Java DataOutputStream writeByte()方法与示例

DataOutputStream类writeByte()方法 (DataOutputStream Class writeByte() method) writeByte() method is available in java.io package. writeByte()方法在java.io包中可用。 writeByte() method is used to write the given value as one-byte value to the basic stream a…

聊聊近期的感受和10月文章精选!

先来看本月的原创文章汇总&#xff0c;其中算法部分也有少量 9 月份的文章&#xff0c;这样汇总起来大家看起来更方便&#xff0c;目录如下。算法系列小白学算法第1篇&#xff1a;一文详解「栈」和手撸栈的两种方法&#xff01;小白学算法第2篇&#xff1a;JDK 竟然是这样实现栈…