Redisson框架

1. Redisson锁与Redis订阅与发布模式的联系:

Redisson锁中,使用订阅发布模式去通知等待锁的客户端:锁已经释放,可以进行抢锁。

  • publish channel_name message:将消息发送到指定频道
    • 解锁时,在Lua解锁脚本中使用:场景1:锁不存在时,发送消息到解锁频道;场景2:锁重入记数hash等于0时,发送消息到解锁频道。
  • subscribe channel_name ... :客户端订阅一个或多个频道消息
    • 加锁时,没有争抢锁成功的客户端,需要订阅解锁频道 -> RedissonLock#tryLock中订阅频道后,会用await()阻塞等待锁释放。直到获取推送的频道解锁消息,才取消等待。

  • unsubsribe channel_name ... :客户端退订一个或多个频道消息
    • 加锁时,客户端最后都会去退订解锁频道,不管加锁成功与失败。

2. RedissonLock锁:

数据结构:

  • Hash结构:key -> name 、field -> lockName (id+":"+threadId -> UUID:threadId) 、 value -> 可重入次数

加锁伪脚本:

// 如果不存在锁:则新增锁,并设置锁重入计数为1、设置锁过期时间,返回nil结束
exists name == 0                      ->  name:加锁的key名称
hset name lockName  1                 -> lockName:当前客户端标识
pexpire name  internalLockLeaseTime   -> internalLockLeaseTime:锁租期
return nil// 如果存在锁,且唯一标识也匹配,表示当前锁可重入,重入计数+1,并重新设置锁过期时间,返回nil结束
hexists  name lockName = 1
hincrby  name lockName  1 
pexpire name  internalLockLeaseTime 
return nil// 如果存在锁,且唯一标识不匹配,表示锁是被其他线程占用,返回剩余时间结束
return pttl name

解锁伪脚本:

// 如果锁不存在:则直接广播解锁消息,返回1,结束
exists name == 0 
publish channelName  0   -> channelName:频道名称 -> redisson_lock__channel:{name}
return 1// 如果锁存在,但是但唯一标识不匹配:表示锁被其他线程占用,当前线程不允许解锁,返回nil,结束
hexists name lockName == 0 
return nil// 如果锁存在,并且唯一标识匹配:则先将锁重入计数 -1
counter = hincrby name lockName -1
// 如果锁重入计数-1后,还大于0,重新设置有效期,返回0,结束
pexpire name  internalLockLeaseTime
return 0// 如果锁重入计数-1后,等于0,直接删除key,并广播解锁消息(即唤醒其它争抢锁的被阻塞的线程),返回1,结束
del name 
publish channelName  0
return 1

参考文档:https://www.cnblogs.com/huangwentian/p/14622441.html

3. RedissonFairLock锁:

数据结构:

  • Hash结构:key -> name 、field -> lockName (UUID:threadId)、 value -> 可重入次数
  • List结构:key -> redisson_lock_queue:{lockName} 、vaule -> lockName (UUID:threadId)
    • 作用:实现公平锁,按顺序记录线程争抢锁的顺序。
  • Zset结构:key -> redisson_lock_timeout:{lockName}、 score -> timeout(过期时间,它是时间戳) member -> lockName (id+":"+threadId)
    • 作用1:利用Zset分值的功能,清除等待队列中,等待加锁,但是已经过期的线程。
    • 作用2:利用Zset分值的功能,在加锁不成功后,计算需要等待的时间(ttl)和更新Zset分值(timeout)

加锁伪脚本:

/ 删除列表中第一个位置是过期的线程
// 死循环
while true do // list中获取第一元素,如果firstThreadId2为空,直接结束循环firstThreadId2 = lindex  redisson_lock_queue:{lockName}  0if firstThreadId2 == falsebreak;// 如果firstThreadId2不为空,从Zset中获取分值timeouttimeout = zscore redisson_lock_timeout:{lockName} firstThreadId2// 如果 timeout <= 当前时间戳,从Zset中删除该成员,从List弹出该元素if timeout <= currentTimezrem redisson_lock_timeout:{lockName}  firstThreadId2lpop  redisson_lock_queue:{lockName} // 如果 timeout > 当前时间戳,结束循环break;// 如果不存在name的key,并且不存队列或者队列第一个元素是lockName时,当前线程可以获得锁
if exists name == 0  && (exists redisson_lock_timeout:{lockName} == 0  ||   lindex  redisson_lock_queue:{lockName}  0 ==  lockName)  lpop redisson_lock_queue:{lockName}  zrem redisson_lock_timeout:{lockName}  lockNamehset name  lockName  1pexpire name internalLockLeaseTimereturn nil// 如果存在name的key,并且hash结构的field也是lockname时,表示当前线程已经获得锁,此时处理锁重入
if hexists name lockName == 1hincrby name lockName 1pexpire name internalLockLeaseTimereturn nil//  获取列表第一个元素,计算当前的ttl,然后再重新计算timeout赋值给Zset中,Zset添加成功后,再rpush到队列,返回ttl
firstThreadId = lindex  redisson_lock_queue:{lockName}  0
ttl
// 如果列表第一元素不为空,并且不是lockName时,从Zset中获取lockName的timeout,计算ttl
if firstThreadId ~= false and firstThreadId ~= lockNamettl = (zscore redisson_lock_timeout:{lockName} lockName) - currentTime// 列表第一元素是lockName时,获取ttlttl = pttl name// 计算 timeouttimeout = ttl + (currentTime + threadWaitTime);// 如果 zadd 添加成功,则 rpushif (zadd  redisson_lock_timeout:{lockName}  timeout lockName) == 1rpush redisson_lock_queue:{lockName}  return ttl          

解锁伪脚本:

// 删除列表中第一个位置是过期的线程
... ...
// 1. 如果不存name时,获取列表第一个元素,并且第一元素不为空,向解锁频道发送通知
if exists name == 0nextThreadId = lindex  redisson_lock_queue:{lockName}  0if nextThreadId ~= false publish channel_name .. ':' .. nextThreadId 0return 1// 2. 如果 hexists name lockName 不存在时,直接返回nil
if hexists name lockName == 0return nil// 3. 如果 hexists name lockName 存在时,处理锁可重入,或删除key,通知下一个元素
counter = hincrby  name  lockName  -1
// 如果counter大于0,设置过期时间
if counter > 0pexpire name internalLockLeaseTime
return 0
// 如果counter小于等于0,删除key
del name
// 获取列表第一个元素,如果不为空,发送解锁通知
nextThreadId = lindex  redisson_lock_queue:{lockName}  0
if nextThreadId ~= false publish channel_name .. ':' .. nextThreadId 0
return 1   

4. 延迟队列:

两个API概念:

  • RBlockingQueue 就是目标队列
  • RDelayedQueue 就是中转队列

  • 我们实际操作的是RBlockingQueue队列,并不是RDelayedQueue队列,RDelayedQueue主要是提供中间转发的一个队列。
  • 我们都是基于RBlockingQueue目标队列在进行消费,而RDelayedQueue就是会把过期的消息放入到我们的目标队列中。
  • 我们只要从RBlockingQueue队列中取数据即可。

数据结构:

  • ZSet结构:key -> redisson_delay_queue_timeout:{name}、score -> System.currentTimeMillis() + delayInMs、member ->数据使用 struct.pack()包装,包含 'dLc0'、randomId、string.len(数据)、数据
    • 作用:该队列实现了获取延迟数据的功能。
  • List结构:key-> redisson_delay_queue:{name} 、value -> 数据使用 struct.pack()包装,包含 'dLc0'、randomId、string.len(数据)、数据
    • 作用:该队列记录了所有数据的加入顺序。
  • List结构:key -> 出入阻塞队列的名称,上边的{name}、value -> 数据

处理流程(展示一条主线):

Redisson延迟队列剖析_why redissondelayedqueue use list-CSDN博客

  • 实例化RedissonDelayedQueue时:
  1. 设置频道名称(redisson_delay_queue_channel:{name})、队列名称(redisson_delay_queue:{name})、延迟队列名称(redisson_delay_queue_timeout:{name})
  2. 设一个“将到期的任务推送阻塞队列”的任务(i.获取到期数据,到阻塞队列的Lua脚本;ii.设置发布订阅的主题)
  3. 启动该任务,并且添加相关监听(i.添加监听订阅发布的事件 ->pushTask() ;ii.添加监听消息的事件->scheduleTask(startTime))
  • 添加消息时:
  1. 向timeoutSet延迟队列添加数据
  2. 向queue队列添加数据
  3. 获取timeoutSet延迟队列的队头数据,如果队头数据等于刚加入的数据,则向频道发送消息
  • 触发频道监听事件->scheduleTask(startTime):
  1. 计算延迟时间 -> delay = startTime - System.currentTimeMillis()
  2. 如果延迟大于10,启动延迟任务
  3. 否则立即执行启动 -> pushTask()
  • 将到期的任务推送到阻塞队列 -> pushTask():
  1. 调用实例化时,定义的pushTaskAsync
  2. 添加监听操作完成的事件,在执行成功后,判断返回值是否为空,不为空调用scheduleTask(future.getNow())

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/36755.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何把项目文文件/文件夹)上传到Gitee(全网最细)

目录 1、首先必须要有一个Gitee官网的账号 2、点击右上角的号&#xff0c;点击新建仓库 3、按照下图步骤&#xff0c;自己起仓库名字&#xff0c;开发语言 4、点击初始化readme文件 5、在自己的电脑上选择姚上传的文件夹&#xff0c;或者文件&#xff0c;这里都是一样的&a…

内网渗透:端口转发(SSH隧道)

SSH&#xff1a;两台设备之间进行远程登录的协议&#xff08;SSH本身就是一个隧道协议&#xff09; 远程文件传输scp命令&#xff08;scp是基于SSH的&#xff09; 拓扑&#xff1a; SSH隧道搭建的条件 1.获取到跳板机权限 2.跳板机中SSH服务启动 SSH端口转发分类&#xff1…

C语言 实现链表的各种功能

C语言实现链表的各种功能 链表的定义 链表是一种数据结构&#xff0c;它是由一系列节点组成的线性结构。每个节点包含两个部分&#xff1a;数据和指针。数据部分存储着实际的数据&#xff0c;指针部分指向下一个节点。 链表的特点是&#xff1a; 每个节点都可以自由地插入或…

python爬虫:实现程序模拟点击搜索等任务,爬取动态网页

引言: 爬虫也被称为网络蜘蛛(Spider),是一种自动化的软件程序,能够在互联网上漫游,按照一定的规则和算法抓取数据。 爬虫技术广泛应用于搜索引擎、 数据挖掘 、信息提取等领域,是互联网技术的重要组成部分。 摘要: 作为爬虫的初学者,网页越简单越好,因为网页的结构…

正点原子rk3588烧录linux和安卓镜像

1、烧录 Linux buildroot 系统镜像 1.1 进入 Loader 模式&#xff1a; 按住开发板上的 V&#xff08;音量&#xff09;按键不松&#xff0c;给开发板 上电或复位&#xff0c;此时烧录工具会提示&#xff1a;发现一个 LOADER 设备&#xff0c;表示开发板此时已经处于 Loader 模…

【爆肝34万字】从零开始学Python第2天: 判断语句【入门到放弃】

目录 前言判断语句True、False简单使用作用 比较运算符引入比较运算符的分类比较运算符的结果示例代码总结 逻辑运算符引入逻辑运算符的简单使用逻辑运算符与比较运算符一起使用特殊情况下的逻辑运算符 if 判断语句引入基本使用案例演示案例补充随堂练习 else 判断子句引入else…

Node.js 事件循环的工作流程二

在每个tick的过程中&#xff0c;如何判断是否有事件需要处理 在 Node.js 的事件循环中&#xff0c;每个循环迭代称为一个 "tick"。在每个 tick 过程中&#xff0c;事件循环需要判断是否有事件需要处理。这个过程主要依赖于检查各个阶段的队列是否有待处理的回调函数…

第3章:Electron的核心概念(2)

3.4 预加载脚本 预加载脚本在渲染进程加载前执行&#xff0c;允许在渲染器上下文中暴露自定义 API&#xff0c;并提供与主进程安全通信的桥梁。使用预加载脚本可以提高应用的安全性&#xff0c;尤其是在启用了 contextIsolation 的情况下。 3.4.1 创建预加载脚本 预加载脚本…

【Docker Compose】掌握容器资源管理:高效限制CPU与内存使用

【Docker Compose】掌握容器资源管理:高效限制CPU与内存使用 一、Docker Compose 介绍1.1 Docker Compose简介1.2 Docker Compose V2简介1.3 Docker Compose V1与V2版本区别1.4 docker-compose.yaml部署文件介绍二、检查本地docker环境2.1 本地环境规划2.2 检查docker版本2.3 …

43.三倍游戏

上海市计算机学会竞赛平台 | YACSYACS 是由上海市计算机学会于2019年发起的活动,旨在激发青少年对学习人工智能与算法设计的热情与兴趣,提升青少年科学素养,引导青少年投身创新发现和科研实践活动。https://www.iai.sh.cn/problem/390 题目描述 三倍游戏是一种单人游戏。玩…

基于51单片机心形LED流水灯电路原理图、PCB和源程序(SCH、PCB源文件)

资料下载地址&#xff1a;基于51单片机心形LED流水灯电路原理图、PCB和源程序&#xff08;SCH、PCB源文件&#xff09; 1、单片机心形LED流水灯功能说明&#xff1a; 单片机&#xff1a;无论是散件还是成品&#xff0c;单片机里面都烧录有LED 流水灯的程序&#xff0c;装上单片…

TCP重传机制

TCP重传机制是为了确保数据可靠传输而设计的&#xff0c;当传输的数据包丢失或损坏时&#xff0c;TCP会重新发送这些数据包。TCP重传主要有以下几种方式&#xff1a; 超时重传&#xff08;Timeout Retransmission&#xff09;&#xff1a; 当发送方发送一个数据包后&#xff…

【Promise】聊聊任务队列

历史小剧场 现实是残酷的&#xff0c;而今这个世界&#xff0c;要活下去&#xff0c;必死需要更大的勇气。 但崇祯的死&#xff0c;并非懦弱&#xff0c;而是一种态度&#xff0c;负责人的态度。 我说过&#xff0c;所谓王朝&#xff0c;跟公司单位差不多&#xff0c;单位出了事…

fly_bid项目

Ncurses库&#xff1a; 提供了创建窗口界面&#xff0c;移动光标&#xff0c;产生颜色&#xff0c;处理键盘按键功能问题等功能。 vim界面&#xff0c;内核配置界面&#xff08;make menuconfig&#xff09; #include<stdio.h> #include<curses.h>int main(int …

【UE 网络】RPC远程过程调用 入门篇

目录 0 引言1 RPC基本概念1.1 定义1.2 分类 2 RPC的使用2.1 Client RPC2.2 Server RPC2.3 Multicast RPC &#x1f64b;‍♂️ 作者&#xff1a;海码007&#x1f4dc; 专栏&#xff1a;UE虚幻引擎专栏&#x1f4a5; 标题&#xff1a;【UE 网络】RPC远程过程调用 入门篇❣️ 寄语…

(七)React:useEffect的理解和使用

1. useEffect的概念理解 useEffect是一个React Hook函数&#xff0c;用于React组件中创建不是由事件引起而是由渲染本身引起的操作&#xff0c;比如发送AJAX请求&#xff0c;更改DOM等等 说明&#xff1a;上面的组件中没有发生任何的用户事件&#xff0c;组件渲染完毕之后就需…

Modbus TCP与TCP/IP协议间的差异与应用场景

Modbus TCP概述 Modbus协议简介 Modbus是一种专为工业自动化系统设计的通信协议&#xff0c;采用主从模式&#xff0c;即一个主设备&#xff08;通常是计算机或可编程逻辑控制器&#xff09;与多个从设备&#xff08;如传感器、执行器等&#xff09;进行通信。Modbus协议具有…

项目优化方案之---实现单设备登录限制

对于一些付费会员或者一些商业项目&#xff0c;为了保证单个用户的账号权益不会被滥用&#xff0c;并且提高系统的安全性&#xff0c;我们会限制单个账号在同一时间内只能有一台设备登录&#xff0c; 来给系统添加共享账号的检测能能力 这里是根据jwt实现的&#xff0c;要实现…

K8S集群进行分布式负载测试

使用K8S集群执行分布式负载测试 本教程介绍如何使用Kubernetes部署分布式负载测试框架&#xff0c;该框架使用分布式部署的locust 产生压测流量&#xff0c;对一个部署到 K8S集群的 Web 应用执行负载测试&#xff0c;该 Web 应用公开了 REST 格式的端点&#xff0c;以响应传入…

初探Xcode工具

初探Xcode工具 Xcode是苹果公司为Mac OS X和iOS平台开发软件的集成开发环境&#xff08;IDE&#xff09;。作为苹果开发者的首选工具&#xff0c;Xcode提供了一系列强大的功能&#xff0c;帮助开发者设计、编写、调试和发布应用程序。本文将对Xcode进行初步探索&#xff0c;介…