并发编程-06之Semaphore

在这里插入图片描述

一 Semaphore入门
1.1 什么是Semaphore
Semaphore,俗称信号量,它是操作系统中PV操作的原语在java的实现,它也是基于AbstractQueuedSynchronizer实现的。
Semaphore的功能非常强大,大小为1的信号量就类似于互斥锁,通过同时只能有一个线程获取信号量实现。大小为n(n>0)的信号量可以实现限流的功能,它可以实现只能有n个线程同时获取信号量。

什么是pv操作?
PV操作是操作系统一种实现进程互斥与同步的有效方法。PV操作与信号量(S)的处理相关,P表示通过的意思,V表示释放的意思。用PV操作来管理共享资源时,首先要确保PV操作自身执行的正确性。
P操作的主要动作是:
①S减1;
②若S减1后仍大于或等于0,则进程继续执行;
③若S减1后小于0,则该进程被阻塞后放入等待该信号量的等待队列中,然后转进程调度。
V操作的主要动作是:
①S加1;
②若相加后结果大于0,则进程继续执行;
③若相加后结果小于或等于0,则从该信号的等待队列中释放一个等待进程,然后再返回原进程继续执行或转进程调度。
1.2 Semaphore的常用方法
构造器
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
● permits 表示许可证的数量(资源数)
● fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程
常用方法
public void acquire() throws InterruptedException
public boolean tryAcquire()
public void release()
public int availablePermits()
public final int getQueueLength()
public final boolean hasQueuedThreads()
protected void reducePermits(int reduction)
● acquire() 表示尝试获取许可(获取不到则阻塞)。
● tryAcquire() 方法在没有许可的情况下会立即返回 false,要获取许可的线程不会阻塞。
● release() 表示释放许可。
● int availablePermits():返回此信号量中当前可用的许可证数。
● int getQueueLength():返回正在等待获取许可证的线程数。
● boolean hasQueuedThreads():是否有线程正在等待获取许可证。
● void reducePermit(int reduction):减少 reduction 个许可证
● Collection getQueuedThreads():返回所有等待获取许可证的线程集合
1.3 应用场景
可以用于做流量控制,特别是公用资源有限的应用场景
代码演示:
模拟一个每5S只能处理5个请求的限流Demo
/**

  • 限流测试

  • @author wcy
    */
    @Slf4j
    public class SemaphoneDemo1 {

    /**

    • 实现一个同时只能处理5个请求的限流器
      */
      private static Semaphore semaphore = new Semaphore(5);

    /**

    • 定义一个线程池
      */
      private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
      10, 50,
      60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(200));

    /**

    • 模拟执行方法
      */
      public static void exec() {
      try {
      semaphore.acquire(1);
      // 模拟真实方法执行
      log.info(“执行exec方法”);
      Thread.sleep(5000);
      } catch (Exception e) {
      e.printStackTrace();
      } finally {
      semaphore.release(1);
      }
      }

    public static void main(String[] args) throws InterruptedException {

     {for (; ; ) {Thread.sleep(100);// 模拟请求以10个/s的速度executor.execute(() -> exec());}}
    

    }
    }
    运行结果:

可以看出,每个周期内只能5个线程执行了方法
二 Semaphore原理
学习Semaphore源码的时候我们有两个关注点:

  1. Semaphore的加锁解锁(共享锁)逻辑实现

  2. 线程竞争锁失败入队阻塞逻辑和获取锁的线程释放锁唤醒阻塞线程竞争锁的逻辑实现
    加锁逻辑(acquire)
    public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
    }
    这里调用了同步器的acquireSharedInterruptibly(int arg)方法
    public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
    throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
    doAcquireSharedInterruptibly(arg);
    }
    先看判断逻辑tryAcquireShared(arg)方法,这是同步器子类实现的
    protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
    }
    再看nonfairTryAcquireShared(acquires)方法
    final int nonfairTryAcquireShared(int acquires) {
    for (;😉 {
    //获取许可证数量
    int available = getState();
    //减去当前线程使用的许可数
    int remaining = available - acquires;
    if (remaining < 0 ||
    compareAndSetState(available, remaining))
    return remaining;
    }
    }
    返回可用的许可数,如果<0,说明没有可用的许可,就会进入 doAcquireSharedInterruptibly(arg)方法,这个方法也是同步器实现的
    private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
    for (;😉 {
    final Node p = node.predecessor();
    if (p == head) {
    int r = tryAcquireShared(arg);
    if (r >= 0) {
    setHeadAndPropagate(node, r);
    p.next = null; // help GC
    failed = false;
    return;
    }
    }
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    throw new InterruptedException();
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }
    注意:Node node = addWaiter(Node.SHARED);这是构建队列的方法,但是和ReentrantLock不同的是,这里参数传的是Node.SHARED,第一个if逻辑是当同步队列中第一个线程被唤醒后,会进入这里重新竞争锁,竞争成功后,做出队的操作,我们假设这里是第一次构建队列,先看addWaiter(Node.SHARED)方法
    static final Node SHARED = new Node();
    private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
    pred.next = node;
    return node;
    }
    }
    enq(node);
    return node;
    }
    这里第一个线程入队,会调用enq方法构建队列,后来的线程会进入if分支,加入队列尾部。当前线程Node的nextWaiter=Node.SHARED
    private Node enq(final Node node) {
    for (;😉 {
    Node t = tail;
    if (t == null) { // Must initialize
    if (compareAndSetHead(new Node()))
    tail = head;
    } else {
    node.prev = t;
    if (compareAndSetTail(t, node)) {
    t.next = node;
    return t;
    }
    }
    }
    }
    tail仍然为空,通过cas操作,新建一个头节点,这就是并发的精髓了,通过一个死循环,第二次循环的时候tail不为空,进入else逻辑,把当前线程所在的节点的前驱节点指向前边的结点,并把当前线程节点设置为尾结点。(这里通过cas保证线程安全问题),至此,我们的队列构建完成,回到doAcquireSharedInterruptibly方法中,可以看出,如果当前线程节点的前驱节点如果是头节点是话还会进行一次cas操作去尝试获取许可,假设还没有线程释放许可,返回负数,进入第二个if逻辑中,有两个判断方法,shouldParkAfterFailedAcquire(p, node)和parkAndCheckInterrupt()方法
    先看shouldParkAfterFailedAcquire(p, node)方法
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
    /*
    * This node has already set status asking a release
    * to signal it, so it can safely park.
    /
    return true;
    if (ws > 0) {
    /

    * Predecessor was cancelled. Skip over predecessors and
    * indicate retry.
    /
    do {
    node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
    } else {
    /

    * waitStatus must be 0 or PROPAGATE. Indicate that we
    * need a signal, but don’t park yet. Caller will need to
    * retry to make sure it cannot acquire before parking.
    /
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
    }
    先判断前驱节点的waitStatus是否为-1.如果不是-1,通过cas操作改为-1,返回false,外边是一个死循环,会第二次进入这个方法,这次判断为-1,返回true,进入parkAndCheckInterrupt()方法,
    private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
    }
    在这里,我们的线程就阻塞着了。
    解锁逻辑(release):
    public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
    }
    接着看releaseShared方法,这个是由同步器来实现的
    public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
    doReleaseShared();
    return true;
    }
    return false;
    }
    接着看tryReleaseShared,这个是同步器子类实现的,主要目的就是释放一个资源许可。
    protected final boolean tryReleaseShared(int releases) {
    for (;😉 {
    int current = getState();
    int next = current + releases;
    if (next < current) // overflow
    throw new Error(“Maximum permit count exceeded”);
    if (compareAndSetState(current, next))
    return true;
    }
    }
    这里释放锁后,许可加1,执行doReleaseShared()方法
    doReleaseShared()方法,
    private void doReleaseShared() {
    /

    * Ensure that a release propagates, even if there are other
    * in-progress acquires/releases. This proceeds in the usual
    * way of trying to unparkSuccessor of head if it needs
    * signal. But if it does not, status is set to PROPAGATE to
    * ensure that upon release, propagation continues.
    * Additionally, we must loop in case a new node is added
    * while we are doing this. Also, unlike other uses of
    * unparkSuccessor, we need to know if CAS to reset status
    * fails, if so rechecking.
    /
    for (;😉 {
    Node h = head;
    if (h != null && h != tail) {
    int ws = h.waitStatus;
    if (ws == Node.SIGNAL) {
    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
    continue; // loop to recheck cases
    unparkSuccessor(h);
    }
    else if (ws == 0 &&
    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    continue; // loop on failed CAS
    }
    if (h == head) // loop if head changed
    break;
    }
    }
    把头节点的waitStatus置为0,调用unparkSuccessor方法
    unparkSuccessor方法
    private void unparkSuccessor(Node node) {
    /

    * If status is negative (i.e., possibly needing signal) try
    * to clear in anticipation of signalling. It is OK if this
    * fails or if status is changed by waiting thread.
    */
    int ws = node.waitStatus;
    if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);

     /** Thread to unpark is held in successor, which is normally* just the next node.  But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);
    

    }
    拿到头结点的下一个节点,唤醒同步队列中阻塞的第一个线程,此时又会回到阻塞的地方doAcquireSharedInterruptibly方法中
    private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
    for (;😉 {
    final Node p = node.predecessor();
    if (p == head) {
    int r = tryAcquireShared(arg);
    if (r >= 0) {
    setHeadAndPropagate(node, r);
    p.next = null; // help GC
    failed = false;
    return;
    }
    }
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    throw new InterruptedException();
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }
    这时候,被阻塞的第一个线程被唤醒,重新进入循环,会进入第一个if中,此时调用tryAcquireShared方法可以拿到一个许可,也就是r>=0,
    然后调用setHeadAndPropagate方法,这就是共享锁和独占锁的区别之一
    setHeadAndPropagate
    private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    //设置新的头节点
    setHead(node);
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
    (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    if (s == null || s.isShared())
    //唤醒下一个节点
    doReleaseShared();
    }
    }
    在这里先把当前线程的节点设置为新的头节点,再次尝试唤醒下一个节点,这样有个好处,就是资源释放得快的话,线程就持续被唤醒,这也就保证了Semaphone可以限流的原因,同时刻,只要有线程释放资源,其他线程就可以拿到许可进而执行自己的业务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/44653.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

centos部署jar包

第一步&#xff1a; 将IDEA中的项目打包为jar,将这个jar文件放到centos服务器上的目录里&#xff0c;我在opt新建api目录&#xff0c;将jar文件放入&#xff0c;如下图&#xff1a; 第二步&#xff1a; 将需要读取的配置文件也放入此目录(其他目录也可以&#xff0c;和脚本中…

搭建RAG系统就这么简单:LangChain|RAG是什么?

RAG是什么 “RAG”&#xff08;Retrieval-Augmented Generation&#xff09;是一种结合了检索&#xff08;Retrieval&#xff09;和生成&#xff08;Generation&#xff09;的人工智能技术&#xff0c;它在大模型中被需要的原因包括&#xff1a; 知识丰富性&#xff1a; 大模…

探索数据结构与算法的奇妙世界 —— Github开源项目推荐《Hello 算法》

在浩瀚的编程与计算机科学领域中&#xff0c;数据结构与算法无疑是每位开发者攀登技术高峰的必经之路。然而&#xff0c;对于初学者而言&#xff0c;这条路往往布满了荆棘与挑战。幸运的是&#xff0c;今天我要向大家推荐一个令人振奋的项目——《Hello Algo》&#xff0c;它正…

ubuntu使用kubeadm搭建k8s集群

一、卸载k8s kubeadm reset -f modprobe -r ipip lsmod rm -rf ~/.kube/# 自己选择性删除 坑点哦 rm -rf /etc/kubernetes/ rm -rf /etc/systemd/system/kubelet.service.d rm -rf /etc/systemd/system/kubelet.service rm -rf /usr/bin/kube* rm -rf /etc/cni rm -rf /opt/cn…

Prometheus + alermanager + webhook-dingtalk 告警

添加钉钉机器人 1. 部署 alermanager 1.1 下载软件包 wget https://github.com/prometheus/alertmanager/releases/download/v0.26.0/alertmanager-0.26.0.linux-amd64.tar.gz 网址 &#xff1a;Releases prometheus/alertmanager (github.com) 1.2 解压软件包 mkdir -pv …

医日健集团技术力量体现测试的背后

医日健集团覆盖式更新 科技日新月异的时代&#xff0c;医日健集团始终走在行业的前列。近日&#xff0c;医日健集团外勤技术人员全面对市场点位投放的数智药房进行了新系统升级和机器测试&#xff0c;这是医日健对于科技创新的最新尝试。 以客户体验为核心优化新体验 医日健集团…

Js 前置,后置补零的原生方法与补字符串 padStart及padEnd

在工作中&#xff0c;遇到了需要将不满八位的一个字符串进行后补0的操作&#xff0c;所以就在网上学习了关于js原生补充字符串的方法&#xff0c;然后用这篇博客记录下来。 目录 前置补充字符串 String.prototype.padStart() 后置补充字符串String.prototype.padEnd() 前置补…

【超音速 专利 CN117710683A】基于分类模型的轻量级工业图像关键点检测方法

申请号CN202311601629.7公开号&#xff08;公开&#xff09;CN117710683A申请日2023.11.27申请人&#xff08;公开&#xff09;超音速人工智能科技股份有限公司发明人&#xff08;公开&#xff09;张俊峰(总); 杨培文(总); 沈俊羽; 张小村 技术领域 本发明涉及图像关键点检测…

数据库MySQL下载安装

MySQL下载安装地址如下&#xff1a; MySQL :: Download MySQL Community Server 1、下载界面 2、点击下载 3、解压记住目录 4、配置my.ini文件 未完..

Vue.js学习笔记(五)抽奖组件封装——转盘抽奖

基于VUE2转盘组件的开发 文章目录 基于VUE2转盘组件的开发前言一、开发步骤1.组件布局2.布局样式3.数据准备 二、最后效果总结 前言 因为之前的转盘功能是图片做的&#xff0c;每次活动更新都要重做UI和前端&#xff0c;为了解决这一问题进行动态配置转盘组件开发&#xff0c;…

STM32智能仓储管理系统教程

目录 引言环境准备晶智能仓储管理系统基础代码实现&#xff1a;实现智能仓储管理系统 4.1 数据采集模块 4.2 数据处理与决策模块 4.3 通信与网络系统实现 4.4 用户界面与数据可视化应用场景&#xff1a;仓储管理与优化问题解决方案与优化收尾与总结 1. 引言 智能仓储管理系统…

7 月12日学习打卡--栈和队列的相互转换

hello大家好呀&#xff0c;本博客目的在于记录暑假学习打卡&#xff0c;后续会整理成一个专栏&#xff0c;主要打算在暑假学习完数据结构&#xff0c;因此会发一些相关的数据结构实现的博客和一些刷的题&#xff0c;个人学习使用&#xff0c;也希望大家多多支持&#xff0c;有不…

什么是STM32?嵌入式和STM32简单介绍

1、嵌入式和STM32 1.1.什么是嵌入式 除了桌面PC之外&#xff0c;所有的控制类设备都是嵌入式 嵌入式系统的定义&#xff1a;“用于控制、监视或者辅助操作机器和设备的装置”。 嵌入式系统是一个控制程序存储在ROM中的嵌入式处理器控制板&#xff0c;是一种专用的计算机系统。…

初阶数据结构速成

本篇文章算是对初阶数据结构的总结&#xff0c;内容较多&#xff0c;请耐心观看 基础概念部分 顺序表 线性表&#xff08; linear list &#xff09;是n个具有相同特性的数据元素的有限序列。 线性表是⼀种在实际中⼴泛使 ⽤的数据结构&#xff0c;常⻅的线性表&#xff1a;…

机器学习——关于极大似然估计法的一些个人思考(通俗易懂极简版)

最近在回顾机器学习的一些相关理论知识&#xff0c;回顾到极大似然法时&#xff0c;对于极大似然法中的一些公式有些迷糊了&#xff0c;所以本文主要想记录并分享一下个人关于极大似然估计法的一些思考&#xff0c;如果有误&#xff0c;请见谅&#xff0c;欢迎一起前来探讨。当…

单元测试实施最佳方案(背景、实施、覆盖率统计)

1. 什么是单元测试&#xff1f; 对于很多开发人员来说&#xff0c;单元测试一定不陌生 单元测试是白盒测试的一种形式&#xff0c;它的目标是测试软件的最小单元——函数、方法或类。单元测试的主要目的是验证代码的正确性&#xff0c;以确保每个单元按照预期执行。单元测试通…

合肥高校大学智能制造实验室数字孪生可视化系统平台建设项目验收

合肥高校大学智能制造实验室近日迎来了一项重要时刻&#xff0c;数字孪生可视化系统平台建设项目顺利通过了验收。这一项目的成功实施&#xff0c;不仅标志着合肥高校在智能制造领域取得新的突破&#xff0c;为我国智能制造技术的发展注入新活力。 合肥高校智能制造实验室作为…

T972 切换至pdm 声音输入的方法

1.在hardware/amlogic/audio/audio_hal/audio_hw.c下&#xff0c;直接切换 在 static unsigned int select_port_by_device(struct aml_audio_device *adev) 中先强制切换为pdm 2.在device mk 配置文件中 #add fof fix the mic bug by jason 20230621 PRODUCT_PROPERTY_OVE…

MySQL 数据库基础概念

一、什么是数据库&#xff1f; 数据库&#xff08;Database&#xff09;是按照数据结构来组织、存储和管理数据的仓库。 每个数据库都有一个或多个不同的 API 用于创建&#xff0c;访问&#xff0c;管理&#xff0c;搜索和复制所保存的数据。 我们也可以将数据存储在文件中&…

MSPM0G3507(三十六)——超声波PID控制小车固定距离

效果图&#xff1a; 波形图软件是VOFA&#xff0c;B站有教程 &#xff0c;虽然有缺点但是非常简单。 视频效果&#xff1a; PID控制距离 之前发过只有超声波测距的代码&#xff0c;MSPM0G3507&#xff08;三十二&#xff09;——超声波模块移植代码-CSDN博客 SYSCFG配置&#…