Redis之zset在异步队列上的应用

当遇到并发的客户端请求时,为了缓解服务端的处理压力,当请求对响应的处理的实时性要求不高时,可以实现一个异步的请求消息队列。

一种实现策略是使用redis的zset,将消息的到期处理时间作为score,然后用多个线程去轮训获取zset中的任务并进行处理。

需要提前考虑一个问题:

如何避免一个任务被多次处理?

一种解决方案是当多个线程获取到任务时,调用redis的zrem命令,将该任务从指定的zset中移除(利用了redis处理命令时是顺序执行的)。

环境

  • JDK17
  • 两个jar包
    • Jedis
    • fastjson2

代码

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import redis.clients.jedis.Jedis;import java.lang.reflect.Type;
import java.util.List;
import java.util.UUID;// 基于Redis实现的延迟队列
public class RedisDelayingQueue<T> {static class TaskItem<T> {public String id;public T msg;}// fastjson序列化对象时如果存在泛型,需要使用TypeReferenceprivate Type TaskType = new TypeReference<TaskItem<T>>(){}.getType();private Jedis jedis;private String queueKey;public RedisDelayingQueue(Jedis jedis, String queueKey) {this.jedis = jedis;this.queueKey = queueKey;}// 将任务添加到 zset 中// 分数是延时的时间public void delay(T msg) {TaskItem<T> task = new TaskItem<T>();task.id = UUID.randomUUID().toString();task.msg = msg;// 序列化任务String s = JSON.toJSONString(task);jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s);}public void loop() {while(!Thread.interrupted()) {// 从zset中取出一个任务List<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);if(values.isEmpty()) {try {Thread.sleep(500);} catch(InterruptedException e) {break;}continue;}String s = values.iterator().next();if(jedis.zrem(queueKey, s) > 0) {TaskItem<T> task = JSON.parseObject(s, TaskType);this.handleMsg(task.msg);}}}public void handleMsg(T msg) {System.out.println(msg);}
}

优化

通过上面loop中代码,多个线程获取到values时,可能会被多个线程同时取到,然后再调用zrem命令去竞争的删除该值,所以会有很多无用的网络请求发送到redis。更容易想到的方案是将取值然后删除的操作变成原子性的,两种实现方案:

  • 通过对代码块进行加锁的方式
  • 利用redis中lua脚本的原子执行的特点

代码块加锁

这种方案不太好,如果两个命令之间发生了网络错误或者延迟,将造成其它线程的阻塞

    public void synchronizedLoop() {while(!Thread.interrupted()) {synchronized(this) {// 从zset中取出一个任务List<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);if(values.isEmpty()) {try {Thread.sleep(500);} catch(InterruptedException e) {break;}continue;}String s = values.iterator().next();if(jedis.zrem(queueKey, s) > 0) {TaskItem<T> task = JSON.parseObject(s, TaskType);this.handleMsg(task.msg);}}}}

Lua脚本

local key = KEYS[1]
local task = redis.call('ZPOPMIN', key)
if task and next(task) != nil thenredis.call('ZREM', key, task[1])return task[1]
elsereturn nil
end

通过查阅文档发现,ZRANGEBYSCORE从6 版本开始已经过时了,所以这里使用ZPOPMIN来获取分数最小的value,可以达到相同的效果。

通过Jedis的eval函数,调用redis执行lua脚本的命令。

    public void luaLoop() {while(!Thread.interrupted()) {Object ans = jedis.eval(script, 1, queueKey);if(ans != null) {String task = (String) ans;TaskItem<T> taskItem = JSON.parseObject(task, TaskType);this.handleMsg(taskItem.msg);}else{try{Thread.sleep(500);}catch(Exception e) {break;}}}}

为什么可以优化

  • 使用lua脚本的方式,使得一个线程如果zset中有任务都会成功获取任务,而不会多个线程同时拿到同一个任务,再去竞争删除,减少了无效的网络IO

测试程序

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;public class Main {public static void main(String[] args) {JedisPool jedisPool = new JedisPool("url-of-redis", 6379, "username", "pass");Jedis jedis = jedisPool.getResource();RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");// 创建一个线程充当生产者,并向redis中存10个异步任务Thread producer = new Thread() {public void run() {for (int i = 0; i < 10; i++) {queue.delay("codehole" + i);}}};// 创建一个线程充当消费者,不断从redis中取任务并执行Thread consumer = new Thread() {public void run() {queue.luaLoop();}};producer.start();consumer.start();try {// 等待生产者线程执行结束producer.join();Thread.sleep(6000);consumer.interrupt();consumer.join();}catch(InterruptedException e) {e.printStackTrace();}}
}

一些问题

这个问题是关于Jedis的问题,因为我通过上面的方式发起redis请求实际上是存在并发问题的,如果将上述代码中的延时去掉,这个问题发生的概率将大大发生,主要是因为Jedis不是线程安全的,换句话说,通过JedisPool获取redis连接的实例,并发访问是是通过同一个socket发送数据的。

这里使用时,最好是每个线程都用有一个Jedis的实例,避免数据竞争问题.这里只是用了两个线程,所以简单手动使用两个redis实例,如果有多个消费者存在的情况下,还是每个线程单独持有一个Jedis才能解决问题。

    private Jedis readJedis;private Jedis writeJedis;

总结

本篇文章记录了使用zset实现一个简单异步队列的过程,然后对于第一次实现存在的一个问题,使用lua或者锁的方式优化网络IO。使用锁的方式会降低程序的并发度,所以一般使用lua脚本的方式来实现。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/116908.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

xcode Simulator 安装

xcode Simulator 安装 参考文档 xcode又又又升级了&#xff0c;升级完成之后不下载最新的 iOS 17 Simulator就不能编译运行了&#xff0c;只能静静的等他下载。但是离谱的是这个居然没有断点续下&#xff0c;每次都要重新下载&#xff0c;眼睁睁的看着下载了4个G然后断掉了从…

Day7力扣打卡

打卡记录 合法分组的最少组数&#xff08;贪心&#xff09; 链接 举例说明&#xff0c;假设 c n t [ x ] 32 cnt[x]32 cnt[x]32&#xff0c; k 10 k10 k10&#xff0c;那么 32 10 10 10 2 321010102 321010102&#xff0c;多出的 2 2 2 可以分成两个 1 1 1&#xf…

Rust实现基于Tokio的限制内存占用的channel

Rust实现基于Tokio的限制内存占用的channel 简介 本文介绍如何基于tokio的channel实现一个限制内存占用的channel。 Tokio提供了多种协程间同步的接口&#xff0c;用于在不同的协程中同步数据。 常用的channel有两种:bounded和unbounded&#xff0c;其中ubbounded的channel可…

网络基础知识100问

1.什么是链接? 链接是指两个设备之间的连接。它包括用于一个设备能够与另一个设备通信的电缆类型和协议。 2.OSI 参考模型的层次是什么? 有 7 个 OSI 层&#xff1a;物理层&#xff0c;数据链路层&#xff0c;网络层&#xff0c;传输层&#xff0c;会话层&#xff0c;表示…

记一次gitlab平台任意用户注册引发的源代码泄漏

文章目录 一、漏洞原因二、漏洞利用1、任意用户注册2、成功进入后台3、越权查看其他用户的仓库源代码4、发现源代码仓库泄漏5、通讯录的地方,发现账号泄漏泄漏三、漏洞进一步利用四、总结五、免责声明一、漏洞原因 可以任意注册账号通过越权,查看其他用户仓库内的源代码造成源…

小白必看,手把手教你重装系统

一&#xff0c;安装步骤 二&#xff0c;重装之前需要做的准备 1、重装之前请大家务必注意备份重要资料。电脑有价&#xff0c;数据无价——重要数据备份之后&#xff0c;电脑随便折腾都没问题。大不了就是重装不成功。系统软件问题多试几次总能解决的&#xff0c;但重要数据一…

前端性能优化 - 虚拟滚动

一 需求背景 需求&#xff1a;在一个表格里面一次性渲染全部数据&#xff0c;不采用分页形式&#xff0c;每行数据都有Echart图插入。 问题&#xff1a;图表渲染卡顿 技术栈&#xff1a;Vue、Element UI 卡顿原因&#xff1a;页面渲染时大量的元素参与到了重排的动作中&#x…

flutter复制口令返回app监听粘贴板

overridevoid didChangeAppLifecycleState(AppLifecycleState state) {switch (state) {case AppLifecycleState.inactive: // 处于这种状态的应用程序应该假设它们可能在任何时候暂停。break;case AppLifecycleState.resumed: //从后台切换前台&#xff0c;界面可见handle();b…

MySQL数据库(三)

文章目录 MySQL数据库一、约束条件二、约束条件之主键三、补充一些其它SQL语句四、表查找关键字Select与from五、查询关键字之where筛选六、查询关键字之group by分组七、分组补充函数八、关键字之having过滤九、关键字之distinct去重十、关键字之order by排序十一、关键字之li…

苹果开发者 Xcode发布TestFlight全流程

打包前注意事项 使用Xcode导出安装包之前&#xff0c;必须先确认账户的所有合约是否全部同意&#xff0c;如果有不同意的&#xff0c;在出包的时候会弹出报错 点击前往苹果开发者官网https://appstoreconnect.apple.com/agreements/ 登录自己的开发者账户后&#xff0c;可以看…

react项目实现文件预览,比如PDF、txt、word、Excel、ppt等常见文件(腾讯云cos)

使用腾讯云文档预览&#xff0c;需要开通文档预览功能&#xff0c;该功能需要收费的。 使用限制 如果需要图片预览、视频或音频可以使用获取下载链接。 页面代码 <button onClick() > {handleClick(myself/文档.xlsx)}>预览</button><div style{{ height:…

谈谈你对spring boot 3.0的理解

谈谈你对spring boot 3.0的理解 一&#xff0c;Spring Boot 3.0 的兼容性 Spring Boot 3.0 在兼容性方面做出了很大的努力&#xff0c;以支持存量项目和老项目。尽管如此&#xff0c;仍需注意以下几点&#xff1a; Java 版本要求&#xff1a;Spring Boot 3.0 要求使用 Java 1…

Boundary-Aware RGBD Salient Object Detection With Cross-Modal Feature Sampling

方法 体会 实验做得比较详细&#xff0c;但未公布代码

六、Python类的高级知识

一、类 情景&#xff1a; 如果不实例化一个类能否调用其中的函数类里面的函数如何减弱跟类的关系&#xff0c;在一起只是为了代码方便整洁 1.实例方法 示例&#xff1a; class TEST():def __init__(self):self.hellonihaodef printf(self):for i in range(1,5):print(str(…

浏览器标签上添加icon图标;html引用ico文件

实例 <link rel"shortcut icon" href"./XXX.ico" type"image/x-icon">页面和图标在同一目录内 则 <link rel"shortcut icon" type"text/css" href"study.ico"/>可以阿里矢量图库关键字搜索下载自己…

H3C SecParh堡垒机 data_provider.php 远程命令执行漏洞

构造poc执行远程命令&#xff1a; /audit/data_provider.php?ds_y2019&ds_m04&ds_d02&ds_hour09&ds_min40&server_cond&service$(id)&identity_cond&query_typeall&formatjson&browsetrue漏洞证明&#xff1a; 文笔生疏&#xff0c…

windows开机自启动和忘记密码-备忘

windows开机自启动和忘记密码-备忘 文章目录 windows开机自启动和忘记密码-备忘1.自启动网址定时任务方式 2.忘记windows用户密码 1.自启动 网址 参考博文&#xff1a;https://blog.csdn.net/wwzmvp/article/details/113656544&#xff0c;感谢博主。 定时任务方式 如图&#…

spring boot项目运行jar包读取包内resources目录下的文件

spring boot项目运行jar包读取包内resources目录下的文件 摘要码代码相关文章 摘要 Spring Boot 项目打包成 jar 包后&#xff0c;resources 目录下的文件将会被打包到 jar 包中。如果需要在 Spring Boot 项目运行 jar 包后读取 resources 目录下的文件&#xff0c;可以使用 t…

使用链表实现栈操作

源码如下&#xff1a; #include <stdio.h> #include <stdlib.h> struct node {int info;struct node *link; }; struct node *top NULL, *temp; void push(struct node *); void pop(struct node *); void display(struct node *);int main() {int x 0, item;pr…

【记录】1024徽章

对spring的理解 1、Spring是什么? Spring是一个轻量级的IoC和AOP容器框架。是为Java应用程序提供基础性服务的一套框架&#xff0c;目的是用于简化企业应用程序的开发&#xff0c;它使得开发者只需要关心业务需求。常见的配置方式有三种&#xff1a;基于XML的配置、基于注解…