局域网网站建设/热搜榜排名今日事件

局域网网站建设,热搜榜排名今日事件,wordpress 自定义内容模型,动漫设计与制作专业的应用领域本文将继续讨论基于Consul的分布式锁实现。信号量是我们在实现并发控制时会经常使用的手段,主要用来限制同时并发线程或进程的数量,比如:Zuul默认情况下就使用信号量来限制每个路由的并发数,以实现不同路由间的资源隔离。 信号量(…

本文将继续讨论基于Consul的分布式锁实现。信号量是我们在实现并发控制时会经常使用的手段,主要用来限制同时并发线程或进程的数量,比如:Zuul默认情况下就使用信号量来限制每个路由的并发数,以实现不同路由间的资源隔离。

信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施,是可以用来保证两个或多个关键代码段不被并发调用。在进入一个关键代码段之前,线程必须获取一个信号量;一旦该关键代码段完成了,那么该线程必须释放信号量。其它想进入该关键代码段的线程必须等待直到第一个线程释放信号量。为了完成这个过程,需要创建一个信号量VI,然后将Acquire Semaphore VI以及Release Semaphore VI分别放置在每个关键代码段的首末端,确认这些信号量VI引用的是初始创建的信号量。如在这个停车场系统中,车位是公共资源,每辆车好比一个线程,看门人起的就是信号量的作用。

实现思路

  • 信号量存储:semaphore/key
  • acquired操作:
    • 创建session
    • 锁定key竞争者:semaphore/key/session
    • 查询信号量:semaphore/key/.lock,可以获得如下内容(如果是第一次创建信号量,将获取不到,这个时候就直接创建)
{
"limit": 3,
"holders": [
"90c0772a-4bd3-3a3c-8215-3b8937e36027",
"93e5611d-5365-a374-8190-f80c4a7280ab"
]
}
  • 如果持有者已达上限,返回false,如果阻塞模式,就继续尝试acquired操作
  • 如果持有者未达上限,更新semaphore/key/.lock的内容,将当前线程的sessionId加入到holders中。注意:更新的时候需要设置cas,它的值是“查询信号量”步骤获得的“ModifyIndex”值,该值用于保证更新操作的基础没有被其他竞争者更新。如果更新成功,就开始执行具体逻辑。如果没有更新成功,说明有其他竞争者抢占了资源,返回false,阻塞模式下继续尝试acquired操作
  • release操作:
    • 从semaphore/key/.lock的holders中移除当前sessionId
    • 删除semaphore/key/session
    • 删除当前的session

流程图

代码实现

public class Semaphore {

private Logger logger = Logger.getLogger(getClass());

private static final String prefix = "semaphore/"; // 信号量参数前缀

private ConsulClient consulClient;
private int limit;
private String keyPath;
private String sessionId = null;
private boolean acquired = false;

/**
*
* @param consulClient consul客户端实例
* @param limit 信号量上限值
* @param keyPath 信号量在consul中存储的参数路径
*/
public Semaphore(ConsulClient consulClient, int limit, String keyPath) {
this.consulClient = consulClient;
this.limit = limit;
this.keyPath = prefix + keyPath;
}

/**
* acquired信号量
*
* @param block 是否阻塞。如果为true,那么一直尝试,直到获取到该资源为止。
* @return
* @throws IOException
*/
public Boolean acquired(boolean block) throws IOException {

if(acquired) {
logger.error(sessionId + " - Already acquired");
throw new RuntimeException(sessionId + " - Already acquired");
}

// create session
clearSession();
this.sessionId = createSessionId("semaphore");
logger.debug("Create session : " + sessionId);

// add contender entry
String contenderKey = keyPath + "/" + sessionId;
logger.debug("contenderKey : " + contenderKey);
PutParams putParams = new PutParams();
putParams.setAcquireSession(sessionId);
Boolean b = consulClient.setKVValue(contenderKey, "", putParams).getValue();
if(!b) {
logger.error("Failed to add contender entry : " + contenderKey + ", " + sessionId);
throw new RuntimeException("Failed to add contender entry : " + contenderKey + ", " + sessionId);
}

while(true) {
// try to take the semaphore
String lockKey = keyPath + "/.lock";
String lockKeyValue;

GetValue lockKeyContent = consulClient.getKVValue(lockKey).getValue();

if (lockKeyContent != null) {
// lock值转换
lockKeyValue = lockKeyContent.getValue();
BASE64Decoder decoder = new BASE64Decoder();
byte[] v = decoder.decodeBuffer(lockKeyValue);
String lockKeyValueDecode = new String(v);
logger.debug("lockKey=" + lockKey + ", lockKeyValueDecode=" + lockKeyValueDecode);

Gson gson = new Gson();
ContenderValue contenderValue = gson.fromJson(lockKeyValueDecode, ContenderValue.class);
// 当前信号量已满
if(contenderValue.getLimit() == contenderValue.getHolders().size()) {
logger.debug("Semaphore limited " + contenderValue.getLimit() + ", waiting...");
if(block) {
// 如果是阻塞模式,再尝试
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
}
continue;
}
// 非阻塞模式,直接返回没有获取到信号量
return false;
}
// 信号量增加
contenderValue.getHolders().add(sessionId);
putParams = new PutParams();
putParams.setCas(lockKeyContent.getModifyIndex());
boolean c = consulClient.setKVValue(lockKey, contenderValue.toString(), putParams).getValue();
if(c) {
acquired = true;
return true;
}
else
continue;
} else {
// 当前信号量还没有,所以创建一个,并马上抢占一个资源
ContenderValue contenderValue = new ContenderValue();
contenderValue.setLimit(limit);
contenderValue.getHolders().add(sessionId);

putParams = new PutParams();
putParams.setCas(0L);
boolean c = consulClient.setKVValue(lockKey, contenderValue.toString(), putParams).getValue();
if (c) {
acquired = true;
return true;
}
continue;
}
}
}

/**
* 创建sessionId
* @param sessionName
* @return
*/
public String createSessionId(String sessionName) {
NewSession newSession = new NewSession();
newSession.setName(sessionName);
return consulClient.sessionCreate(newSession, null).getValue();
}

/**
* 释放session、并从lock中移除当前的sessionId
* @throws IOException
*/
public void release() throws IOException {
if(this.acquired) {
// remove session from lock
while(true) {
String contenderKey = keyPath + "/" + sessionId;
String lockKey = keyPath + "/.lock";
String lockKeyValue;

GetValue lockKeyContent = consulClient.getKVValue(lockKey).getValue();
if (lockKeyContent != null) {
// lock值转换
lockKeyValue = lockKeyContent.getValue();
BASE64Decoder decoder = new BASE64Decoder();
byte[] v = decoder.decodeBuffer(lockKeyValue);
String lockKeyValueDecode = new String(v);
Gson gson = new Gson();
ContenderValue contenderValue = gson.fromJson(lockKeyValueDecode, ContenderValue.class);
contenderValue.getHolders().remove(sessionId);
PutParams putParams = new PutParams();
putParams.setCas(lockKeyContent.getModifyIndex());
consulClient.deleteKVValue(contenderKey);
boolean c = consulClient.setKVValue(lockKey, contenderValue.toString(), putParams).getValue();
if(c) {
break;
}
}
}
// remove session key

}
this.acquired = false;
clearSession();
}

public void clearSession() {
if(sessionId != null) {
consulClient.sessionDestroy(sessionId, null);
sessionId = null;
}
}

class ContenderValue implements Serializable {

private Integer limit;
private List<String> holders = new ArrayList<>();

public Integer getLimit() {
return limit;
}

public void setLimit(Integer limit) {
this.limit = limit;
}

public List<String> getHolders() {
return holders;
}

public void setHolders(List<String> holders) {
this.holders = holders;
}

@Override
public String toString() {
return new Gson().toJson(this);
}

}

}

单元测试

下面单元测试的逻辑:通过线程的方式来模拟不同的分布式服务来获取信号量执行业务逻辑。由于信号量与简单的分布式互斥锁有所不同,它不是只限定一个线程可以操作,而是可以控制多个线程的并发,所以通过下面的单元测试,我们设置信号量为3,然后同时启动15个线程来竞争的情况,来观察分布式信号量实现的结果如何。

public class TestLock {

private Logger logger = Logger.getLogger(getClass());

@Test
public void testSemaphore() throws Exception {
new Thread(new SemaphoreRunner(1)).start();
new Thread(new SemaphoreRunner(2)).start();
new Thread(new SemaphoreRunner(3)).start();
new Thread(new SemaphoreRunner(4)).start();
new Thread(new SemaphoreRunner(5)).start();
new Thread(new SemaphoreRunner(6)).start();
new Thread(new SemaphoreRunner(7)).start();
new Thread(new SemaphoreRunner(8)).start();
new Thread(new SemaphoreRunner(9)).start();
new Thread(new SemaphoreRunner(10)).start();
Thread.sleep(1000000L);
}
}

public class SemaphoreRunner implements Runnable {

private Logger logger = Logger.getLogger(getClass());

private int flag;

public SemaphoreRunner(int flag) {
this.flag = flag;
}

@Override
public void run() {
Semaphore semaphore = new Semaphore(new ConsulClient(), 3, "mg-init");
try {
if (semaphore.acquired(true)) {
// 获取到信号量,执行业务逻辑
logger.info("Thread " + flag + " start!");
Thread.sleep(new Random().nextInt(10000));
logger.info("Thread " + flag + " end!");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 信号量释放、Session锁释放、Session删除
semaphore.release();
} catch (IOException e) {
e.printStackTrace();
}
}

}
}
INFO  [Thread-6] SemaphoreRunner - Thread 7 start!
INFO [Thread-2] SemaphoreRunner - Thread 3 start!
INFO [Thread-7] SemaphoreRunner - Thread 8 start!
INFO [Thread-2] SemaphoreRunner - Thread 3 end!
INFO [Thread-5] SemaphoreRunner - Thread 6 start!
INFO [Thread-6] SemaphoreRunner - Thread 7 end!
INFO [Thread-9] SemaphoreRunner - Thread 10 start!
INFO [Thread-5] SemaphoreRunner - Thread 6 end!
INFO [Thread-1] SemaphoreRunner - Thread 2 start!
INFO [Thread-7] SemaphoreRunner - Thread 8 end!
INFO [Thread-10] SemaphoreRunner - Thread 11 start!
INFO [Thread-10] SemaphoreRunner - Thread 11 end!
INFO [Thread-12] SemaphoreRunner - Thread 13 start!
INFO [Thread-1] SemaphoreRunner - Thread 2 end!
INFO [Thread-3] SemaphoreRunner - Thread 4 start!
INFO [Thread-9] SemaphoreRunner - Thread 10 end!
INFO [Thread-0] SemaphoreRunner - Thread 1 start!
INFO [Thread-3] SemaphoreRunner - Thread 4 end!
INFO [Thread-14] SemaphoreRunner - Thread 15 start!
INFO [Thread-12] SemaphoreRunner - Thread 13 end!
INFO [Thread-0] SemaphoreRunner - Thread 1 end!
INFO [Thread-13] SemaphoreRunner - Thread 14 start!
INFO [Thread-11] SemaphoreRunner - Thread 12 start!
INFO [Thread-13] SemaphoreRunner - Thread 14 end!
INFO [Thread-4] SemaphoreRunner - Thread 5 start!
INFO [Thread-4] SemaphoreRunner - Thread 5 end!
INFO [Thread-8] SemaphoreRunner - Thread 9 start!
INFO [Thread-11] SemaphoreRunner - Thread 12 end!
INFO [Thread-14] SemaphoreRunner - Thread 15 end!
INFO [Thread-8] SemaphoreRunner - Thread 9 end!

从测试结果,我们可以发现当信号量持有者数量达到信号量上限3的时候,其他竞争者就开始进行等待了,只有当某个持有者释放信号量之后,才会有新的线程变成持有者,从而开始执行自己的业务逻辑。所以,分布式信号量可以帮助我们有效的控制同时操作某个共享资源的并发数。

优化建议

同前文一样,这里只是做了简单的实现。线上应用还必须加入TTL的session清理以及对.lock资源中的无效holder进行清理的机制。

参考文档:https://www.consul.io/docs/guides/semaphore.html

实现代码

  • GitHub:https://github.com/dyc87112/consul-distributed-lock
  • 开源中国:http://git.oschina.net/didispace/consul-distributed-lock

money.jpg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/477461.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

图谱实战 | 图视角下的信息抽取技术研究

导读&#xff1a;本次分享题目为《图视角下的信息抽取技术研究》&#xff0c;主要介绍&#xff1a;研究背景和意义国内外研究现状研究目标与内容主要成果与创新之处完成项目及发表论文情况分享嘉宾&#xff5c;郁博文博士 达摩院 算法专家编辑整理&#xff5c;王露出品平台&…

LeetCode 1335. 工作计划的最低难度(DP)

1. 题目 你需要制定一份 d 天的工作计划表。工作之间存在依赖&#xff0c;要想执行第 i 项工作&#xff0c;你必须完成全部 j 项工作&#xff08; 0 < j < i&#xff09;。 你每天 至少 需要完成一项任务。工作计划的总难度是这 d 天每一天的难度之和&#xff0c;而一天…

CCKS-面向数字商务的知识图谱比赛重磅上线,奖金等你来拿!

​CCKS 2022面向数字商务的知识图谱评测赛题介绍阿里巴巴商品数据规模庞大&#xff0c;商品知识图谱为海量异构的商品数据的组织、管理和利用提供了有效的方式。商品数据模态丰富&#xff0c;动态性高&#xff0c;数据存在噪声&#xff0c;这些都对商品的认知和理解带来了巨大挑…

基于Consul的分布式锁实现

我们在构建分布式系统的时候&#xff0c;经常需要控制对共享资源的互斥访问。这个时候我们就涉及到分布式锁&#xff08;也称为全局锁&#xff09;的实现&#xff0c;基于目前的各种工具&#xff0c;我们已经有了大量的实现方式&#xff0c;比如&#xff1a;基于Redis的实现、基…

LeetCode 1289. 下降路径最小和 II(DP)

1. 题目 给你一个整数方阵 arr &#xff0c;定义「非零偏移下降路径」为&#xff1a;从 arr 数组中的每一行选择一个数字&#xff0c;且按顺序选出来的数字中&#xff0c;相邻数字不在原数组的同一列。 请你返回非零偏移下降路径数字和的最小值。 示例 1&#xff1a; 输入&a…

警惕“不续签劳动合同”式裁员

文 | 是小酒呀源 | 知乎正文内容来源于作者 是小酒呀知乎&#xff0c;原文链接&#xff1a;https://zhuanlan.zhihu.com/p/499838511没想到裁员还有不续约这招&#xff1f;&#xff01;&#xff01;3月14日一大早&#xff0c;leader叫我去会议室&#xff0c;进入会议室后&#…

图谱实战 | OPPO自研大规模知识图谱及其在小布助手中的应用

分享嘉宾&#xff1a;李向林 OPPO编辑整理&#xff1a;吴祺尧 加州大学圣地亚哥分校出品平台&#xff1a;DataFunTalk导读&#xff1a;OPPO知识图谱是OPPO数智工程系统小布助手团队主导、多团队协作建设的自研大规模通用知识图谱&#xff0c;目前已达到数亿实体和数十亿三元组的…

LeetCode 1284. 转化为全零矩阵的最少反转次数(BFS 矩阵状态编码解码)

1. 题目 给你一个 m x n 的二进制矩阵 mat。 每一步&#xff0c;你可以选择一个单元格并将它反转&#xff08;反转表示 0 变 1 &#xff0c;1 变 0 &#xff09;。如果存在和它相邻的单元格&#xff0c;那么这些相邻的单元格也会被反转。&#xff08;注&#xff1a;相邻的两个…

Netflix Zuul与Nginx的性能对比

这是一篇翻译&#xff0c;关于大家经常质疑的一个问题&#xff1a;API网关Zuul的性能。原文&#xff1a;NETFLIX ZUUL VS NGINX PERFORMANCE作者&#xff1a;STANISLAV MIKLIK 如今你可以听到很多关于“微服务”的信息。Spring Boot是一个用来构建单个微服务应用的理想选择&…

LeetCode 773. 滑动谜题(BFS 地图状态转换的最短距离)

1. 题目 在一个 2 x 3 的板上&#xff08;board&#xff09;有 5 块砖瓦&#xff0c;用数字 1~5 来表示, 以及一块空缺用 0 来表示. 一次移动定义为选择 0 与一个相邻的数字&#xff08;上下左右&#xff09;进行交换. 最终当板 board 的结果是 [[1,2,3],[4,5,0]] 谜板被解开…

Spring Cloud实战小贴士:随机端口

太久没有更新&#xff0c;一时不知道该从哪儿开始&#xff0c;索性就从一个小技巧开始吧。 在之前的《Spring Cloud构建微服务架构》系列博文中&#xff0c;我们经常会需要启动多个实例的情况来测试注册中心、配置中心等基础设施的高可用&#xff0c;也会用来测试客户端负载均衡…

图谱实战 | 基于金融场景的事理图谱构建与应用

分享嘉宾&#xff1a;肖楠 京东科技 算法专家编辑整理&#xff1a;付村 云融创新出品平台&#xff1a;DataFunTalk导读&#xff1a;今天分享京东科技近期在事理图谱构建和应用方面的研究成果&#xff0c;主要分为以下五个部分&#xff1a;京东科技图谱简介金融事理图谱构建因果…

综述 | 基于深度学习的目标检测算法

计算机视觉是人工智能的关键领域之一&#xff0c;是一门研究如何使机器“看”的科学。图像目标检测又是计算机视觉的关键任务&#xff0c;主要对图像或视频中的物体进行识别和定位&#xff0c;是AI后续应用的基础。因此&#xff0c;检测性能的好坏直接影响到后续目标 追踪、动作…

LeetCode 1337. 方阵中战斗力最弱的 K 行(优先队列)

1. 题目 给你一个大小为 m * n 的方阵 mat&#xff0c;方阵由若干军人和平民组成&#xff0c;分别用 0 和 1 表示。 请你返回方阵中战斗力最弱的 k 行的索引&#xff0c;按从最弱到最强排序。 如果第 i 行的军人数量少于第 j 行&#xff0c;或者两行军人数量相同但 i 小于 j…

Spring Cloud实战小贴士:版本依赖关系

去年在博客上连载了《Spring Cloud构建微服务架构》的系列博文&#xff0c;虽然这部分内容得到了不少关注者们的支持&#xff0c;但是不得不说这些内容只是适用于Spring Cloud入门阶段对各个组件的初步认识。所以&#xff0c;今年除了将会继续更新《Spring Cloud构建微服务架构…

关于神经网络,一个学术界搞错了很多年的问题

文 | 五楼知乎说一个近年来神经网络方面澄清的一个误解。BP算法自八十年代发明以来&#xff0c;一直是神经网络优化的最基本的方法。神经网络普遍都是很难优化的&#xff0c;尤其是当中间隐含层神经元的个数较多或者隐含层层数较多的时候。长期以来&#xff0c;人们普遍认为&am…

Consul注销实例时候的问题

当我们在Spring Cloud应用中使用Consul来实现服务治理时&#xff0c;由于Consul不会自动将不可用的服务实例注销掉&#xff08;deregister&#xff09;&#xff0c;这使得在实际使用过程中&#xff0c;可能因为一些操作失误、环境变更等原因让Consul中存在一些无效实例信息&…

刷榜思路少?顶级中文NLP比赛解题方法直播来了!

如何利用稠密向量表示学习方法提升模型在大规模中文段落数据中的检索能力&#xff1f;如何利用外部知识信息减少模型在智能音箱、语音助手等产品中因知识不足导致的“智商”掉线&#xff1f;深度学习在众多NLP任务取得惊艳表现&#xff0c;但如何缓解其黑盒属性造成的应用落地「…

LeetCode 1338. 数组大小减半

1. 题目 给你一个整数数组 arr。你可以从中选出一个整数集合&#xff0c;并删除这些整数在数组中的每次出现。 返回 至少 能删除数组中的一半整数的整数集合的最小大小。 示例 1&#xff1a; 输入&#xff1a;arr [3,3,3,3,5,5,5,2,2,7] 输出&#xff1a;2 解释&#xff1a…

技术动态 | 知识图谱的自监督学习与逻辑推理

分享嘉宾&#xff1a;胡子牛 UCLA PHD编辑整理&#xff1a;wei ai-fir出品平台&#xff1a;DataFunTalk导读&#xff1a;知识图谱是一种多关系的图结构&#xff0c;每个节点表示一个实体&#xff0c;每个边表示连接的两个节点之间的关系&#xff0c;可以将图谱中建模的节点和边…