Java并发编程之AbstractQueuedSynchronizer(AQS)源码解析

自己一个人随便看看源码学习的心得,分享一下啦,不过我觉得还是建议去买本Java并发编程的书来看会比较好点,毕竟个人的理解有限嘛。

独占锁和共享锁

首先先引入这两个锁的概念:
独占锁即同一时刻只有一个线程才能获取到锁,Lock的实现类中ReentrantLock和WriteLock就是独占锁,所以独占锁也叫排它锁;

共享锁是同一时刻多线程能获取到的锁叫共享锁,即ReadLock;在读写锁共有的情况下,会出现共存的情况即读-读共存,读-写、写-写不能共存,他们会产生互斥。

数据结构

打开源码首先就得看下它的所有方法和变量,就会发现,AQS其实用的设计模式是模板模式,先讲一下啥是模板模式吧,直接上代码吧,用代码看会比较直观一点:

public abstract class UseTemplate{//这个定义的三个抽象接口方法public abstract void function1();public abstract void function2();public abstract void function3();//...还有其他属性或方法//这个方法就是模板方法,在继承的子类中实现好抽象方法,然后在调用父类的模板方法public final void runTemplate(){//在父类中写好功能执行的方法即可,也可设置权限,禁止重新覆盖function1();function2();function3();}}

AQS是采用的是一个双向链表队列,在该类中包含了一个Node类,双向链表就是右这个实现的

先了解一下这个静态内部类,以下为源代码(每个属性在下面都有解释):

static final class Node{/** Marker to indicate a node is waiting in shared mode *///用于共享锁的节点属性static final Node SHARED = new Node();/** Marker to indicate a node is waiting in exclusive mode *///用于独占锁使用属性static final Node EXCLUSIVE = null;/** waitStatus value to indicate thread has cancelled */static final int CANCELLED =  1;/** waitStatus value to indicate successor's thread needs unparking */static final int SIGNAL    = -1;/** waitStatus value to indicate thread is waiting on condition */static final int CONDITION = -2;/*** waitStatus value to indicate the next acquireShared should* unconditionally propagate*/static final int PROPAGATE = -3;/*** Status field, taking on only the values:*   SIGNAL:     The successor of this node is (or will soon be)*               blocked (via park), so the current node must*               unpark its successor when it releases or*               cancels. To avoid races, acquire methods must*               first indicate they need a signal,*               then retry the atomic acquire, and then,*               on failure, block.*   CANCELLED:  This node is cancelled due to timeout or interrupt.*               Nodes never leave this state. In particular,*               a thread with cancelled node never again blocks.*   CONDITION:  This node is currently on a condition queue.*               It will not be used as a sync queue node*               until transferred, at which time the status*               will be set to 0. (Use of this value here has*               nothing to do with the other uses of the*               field, but simplifies mechanics.)*   PROPAGATE:  A releaseShared should be propagated to other*               nodes. This is set (for head node only) in*               doReleaseShared to ensure propagation*               continues, even if other operations have*               since intervened.*   0:          None of the above** The values are arranged numerically to simplify use.* Non-negative values mean that a node doesn't need to* signal. So, most code doesn't need to check for particular* values, just for sign.** The field is initialized to 0 for normal sync nodes, and* CONDITION for condition nodes.  It is modified using CAS* (or when possible, unconditional volatile writes).*///当前节点的状态,状态值即上面几个状态值,有相对应的英文解释,就不翻译了volatile int waitStatus;/*** Link to predecessor node that current node/thread relies on* for checking waitStatus. Assigned during enqueuing, and nulled* out (for sake of GC) only upon dequeuing.  Also, upon* cancellation of a predecessor, we short-circuit while* finding a non-cancelled one, which will always exist* because the head node is never cancelled: A node becomes* head only as a result of successful acquire. A* cancelled thread never succeeds in acquiring, and a thread only* cancels itself, not any other node.*/volatile Node prev;//前一个节点指向/*** Link to the successor node that the current node/thread* unparks upon release. Assigned during enqueuing, adjusted* when bypassing cancelled predecessors, and nulled out (for* sake of GC) when dequeued.  The enq operation does not* assign next field of a predecessor until after attachment,* so seeing a null next field does not necessarily mean that* node is at end of queue. However, if a next field appears* to be null, we can scan prev's from the tail to* double-check.  The next field of cancelled nodes is set to* point to the node itself instead of null, to make life* easier for isOnSyncQueue.*/volatile Node next;//下一个节点指向/*** The thread that enqueued this node.  Initialized on* construction and nulled out after use.*/volatile Thread thread;//当前节点的线程/*** Link to next node waiting on condition, or the special* value SHARED.  Because condition queues are accessed only* when holding in exclusive mode, we just need a simple* linked queue to hold nodes while they are waiting on* conditions. They are then transferred to the queue to* re-acquire. And because conditions can only be exclusive,* we save a field by using special value to indicate shared* mode.*/Node nextWaiter;//使用在Condition上面,即下一个等待节点/*** Returns true if node is waiting in shared mode.*/final boolean isShared() {return nextWaiter == SHARED;}/*** Returns previous node, or throws NullPointerException if null.* Use when predecessor cannot be null.  The null check could* be elided, but is present to help the VM.** @return the predecessor of this node*///获取前一个节点final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}Node() {    // Used to establish initial head or SHARED marker}Node(Thread thread, Node mode) {     // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}
}

了解完这个之后,在AQS类中有三个变量:

    /*** Head of the wait queue, lazily initialized.  Except for* initialization, it is modified only via method setHead.  Note:* If head exists, its waitStatus is guaranteed not to be* CANCELLED.*/private transient volatile Node head;//当前头部节点/*** Tail of the wait queue, lazily initialized.  Modified only via* method enq to add new wait node.*/private transient volatile Node tail;//当前尾节点/*** The synchronization state.*/private volatile int state;//拿锁的状态

AQS中还有一个内部类ConditionObject,这个类继承了Condition接口,这个类也有一个队列,不过不是双向队列,单向队列,也是通过Node实现的:

        /** First node of condition queue. */private transient Node firstWaiter;/** Last node of condition queue. */private transient Node lastWaiter;

AQS方法

独占式获取:​ accquire()、​ acquireInterruptibly()、​ tryAcquireNanos()

共享式获取:​ acquireShared()、​ acquireSharedInterruptibly()、​ tryAcquireSharedNanos()

独占式释放锁:​ release()

共享式释放锁:​ releaseShared()

需要子类覆盖的流程方法

​独占式获取: tryAcquire()

​独占式释放: tryRelease()

​共享式获取: tryAcquireShared()

共享式释放: tryReleaseShared()

这个同步器是否处于独占模式 isHeldExclusively()

同步状态state:1)、​ getState():获取当前的同步状态;2)、​ setState(int):设置当前同步状态;3)、​ compareAndSetState(int,int) 使用CAS设置状态,保证状态设置的原子性.

AQS使用方法及流程

需要继承AQS这个类并实现他的方法,我们先自己写一个独占锁的实现方式:

	/*** state 当为0的时候表示还没有拿到锁 为1的时候为拿到锁了* @author 26225**/static final class Sync extends AbstractQueuedSynchronizer{/*** */private static final long serialVersionUID = 1L;@Overrideprotected boolean tryAcquire(int arg) {if(getState() == 1)return false;else {/*** 需使用compareAndSetState() CAS原子操作 而不是使用setState(1),因为使用了volatile修饰了state 虽然保证了可见性,但是修改还是得保证原子操作*/if(compareAndSetState(getState(), arg)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}}@Overrideprotected boolean tryRelease(int arg) {if(getState() == 0 )throw new UnsupportedOperationException();setExclusiveOwnerThread(null);setState(arg);return true;}@Overrideprotected boolean isHeldExclusively() {return getState() == 1;}Condition newCondition() {return new ConditionObject();}}private final Sync sync = new Sync();public void lock() {sync.acquire(1);}public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}public boolean tryLock() {return sync.tryAcquire(1);}public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {// TODO Auto-generated method stubreturn sync.tryAcquireNanos(1, unit.toNanos(time));}public void unlock() {sync.release(0);}public Condition newCondition() {return sync.newCondition();}

先简单的讲解一下该段代码:

在调用Lock的unlock()方法时,需调用AQS的acquire()方法,我们先看一下这个方法调用:

/*** Acquires in exclusive mode, ignoring interrupts.  Implemented* by invoking at least once {@link #tryAcquire},* returning on success.  Otherwise the thread is queued, possibly* repeatedly blocking and unblocking, invoking {@link* #tryAcquire} until success.  This method can be used* to implement method {@link Lock#lock}.** @param arg the acquire argument.  This value is conveyed to*        {@link #tryAcquire} but is otherwise uninterpreted and*        can represent anything you like.*/public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}

其实就是先调用我们自己实现的方法,判断拿锁的状态,并且设置这个状态;如果拿到了锁的话,就不用管它,没有拿到的话,就得加入到等待队列中,我们看一下加入等待队列的方法:

    /*** Creates and enqueues node for current thread and given mode.** @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared* @return the new node*/private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}/*** Inserts node into queue, initializing if necessary. See picture above.* @param node the node to insert* @return node's predecessor*/private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}

 判断头部节点是否存在,如果不存在的话就调用enq()方法,判断当前尾节点存不存在,存在的话,则将当前节点通过CAS操作设置成尾节点,如果不存在则将当前线程包装成一个Node对象设置成头节点,并且将头部和尾部设置成同一个

final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}/*** Convenience method to park and then check if interrupted** @return {@code true} if interrupted*/private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}

 这个方法是加入到队列,就是设置当前Node对象的前一个节点指向以及后一个节点指向,并且调用parkAndCheckInterrupt()通过LockSupport将当前线程进入到等待状态。

以上就是调用lock()方法基本流程,现在看一下代用unlock()的基本流程,执行的是AQS的release()

    /*** Releases in exclusive mode.  Implemented by unblocking one or* more threads if {@link #tryRelease} returns true.* This method can be used to implement method {@link Lock#unlock}.** @param arg the release argument.  This value is conveyed to*        {@link #tryRelease} but is otherwise uninterpreted and*        can represent anything you like.* @return the value returned from {@link #tryRelease}*/public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}/*** Wakes up node's successor, if one exists.** @param node the node*/private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling.  It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node.  But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}

 在释放锁的过程中,首先还原之前的拿锁状态,还原之后将队列头部节点的下一个节点通过LockSupport.unpark()进行唤醒。

AbstractOwnableSynchronizer

在JDK1.6之后AQS继承了AbstractOwnableSynchronizer这个类,其实这个类主要是用来记录当前访问的线程:

 /*** The current owner of exclusive mode synchronization.*/private transient Thread exclusiveOwnerThread;/*** Sets the thread that currently owns exclusive access.* A {@code null} argument indicates that no thread owns access.* This method does not otherwise impose any synchronization or* {@code volatile} field accesses.* @param thread the owner thread*/protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;}/*** Returns the thread last set by {@code setExclusiveOwnerThread},* or {@code null} if never set.  This method does not otherwise* impose any synchronization or {@code volatile} field accesses.* @return the owner thread*/protected final Thread getExclusiveOwnerThread() {return exclusiveOwnerThread;}

AQS的基本流程差不多分析完了,讲的有问题的话,还请大佬指正!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/537172.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

采集用python还是火车头_我才知道爬虫也可以酱紫--火车采集器

我才知道爬虫还可以这样—火车采集器的使用说在前面额。。。好吧&#xff0c;我这一个三毛钱的屌丝也开始步入实习阶段了&#xff0c;在北京其实也挺好的&#xff0c;虽说压力大&#xff0c;但是今后就业机会也相对而言大一些。好了&#xff0c;说回今天的主题&#xff0c;之前…

mvn 使用中的错误

出现这种错误的时候&#xff1a;mvn Error building POM may not be this projects POM&#xff0c;报的是那个jar 包&#xff0c;就删除那个jar 包&#xff0c;重新mvn clean install .ok

Java并发编程之FutureTask源码解析

上次总结一下AQS的一些相关知识&#xff0c;这次总结了一下FutureTask的东西&#xff0c;相对于AQS来说简单好多呀 之前提到过一个LockSupport的工具类&#xff0c;也了解一下这个工具类的用法&#xff0c;这里也巩固一下吧 /*** Makes available the permit for the given th…

java 删除二维数组中的null_避免在Java中检查Null语句

1.概述通常&#xff0c;在Java代码中处理null变量、引用和集合很棘手。它们不仅难以识别&#xff0c;而且处理起来也很复杂。事实上&#xff0c;在编译时无法识别处理null的任何错误&#xff0c;会导致运行时NullPointerException。在本教程中&#xff0c;我们将了解在Java中检…

Java并发编程之并发容器ConcurrentHashMap(JDK1.7)解析

最近看了一下ConcurrentHashMap的相关代码&#xff0c;感觉JDK1.7和JDK1.8差别挺大的&#xff0c;这次先看下JDK1.7是怎么实现的吧 哈希&#xff08;hash&#xff09; 先了解一下啥是哈希&#xff08;网上有很多介绍&#xff09;&#xff0c;是一种散列函数&#xff0c;简单来…

带控制端的逻辑运算电路_分别完成正整数的平方、立方和阶乘的运算verilog语言...

练习&#xff1a;设计一个带控制端的逻辑运算电路&#xff0c;分别完成正整数的平方、立方和阶乘的运算。 //--------------myfunction---------- modulemyfunction(clk,n,result,reset,sl); output[6:0]result; input[2:0] n; input reset,clk; input [1:0] sl; reg[6:0]resul…

Java并发编程之并发容器ConcurrentHashMap(JDK1.8)解析

这个版本ConcurrentHashMap难度提升了很多&#xff0c;就简单的谈一下常用的方法就好了&#xff0c;可能有些讲的不太清楚&#xff0c;麻烦发现的大佬指正一下 主要数据结构 1.8将Segment取消了&#xff0c;保留了table数组的形式&#xff0c;但是不在以HashEntry纯链表的形式…

simulink显示多个数据_如何在 Simulink 中使用 PID Tuner 进行 PID 调参?

作者 | 安布奇责编 | 胡雪蕊出品 | CSDN(ID: CSDNnews)本文为一篇技术干货&#xff0c;主要讲述在Simulink如何使用PID Tuner进行PID调参。PID调参器( PIDTuner)概述1.1 简介使用PID Tuner可以对Simulink模型中的PID控制器&#xff0c;离散PID控制器&#xff0c;两自由度PID控制…

Java并发编程之堵塞队列介绍以及SkipList(跳表)

堵塞队列 先了解一下生产者消费者模式&#xff1a; 生产者就是生产数据的一方&#xff0c;消费者就是消费数据的另一方。在多线程开发中&#xff0c;如果生产者处理速度很快&#xff0c;而消费者处理速度很慢&#xff0c;那么生产者就必须等待消费者处理完&#xff0c;才能继…

python生成list的时候 可以用lamda也可以不用_python 可迭代对象,迭代器和生成器,lambda表达式...

分页查找#5.随意写一个20行以上的文件(divmod)# 运行程序&#xff0c;先将内容读到内存中&#xff0c;用列表存储。# l []# 提示&#xff1a;一共有多少页# 接收用户输入页码&#xff0c;每页5条&#xff0c;仅输出当页的内容def read_page(bk_list,n,endlineNone):startline …

数据挖掘技术简介[转]

关键词&#xff1a; 关键词&#xff1a;数据挖掘 数据集合 1. 引言  数据挖掘(Data Mining)是从大量的、不完全的、有噪声的、模糊的、随机的数据中提取隐含在其中的、人们事先不知道的、但又是潜在有用的信息和知识的过程。随…

树莓派安装smbus_树莓派使用smbus不兼容问题(no module named 'smbus')

树莓派使用smbus不兼容问题(no module named ‘smbus’)python3.5–3.6可以使用smbus2代替smbus1. 先参考以下方法&#xff1a;github讨论树莓派社区2.Pypi上可以下载smbus2smbus2PyPi介绍&#xff1a;当前支持的功能有&#xff1a;获取i2c功能(I2C_FUNCS)read_bytewrite_byter…

Java并发编程之线程池ThreadPoolExecutor解析

线程池存在的意义 平常使用线程即new Thread()然后调用start()方法去启动这个线程&#xff0c;但是在频繁的业务情况下如果在生产环境大量的创建Thread对象是则会浪费资源&#xff0c;不仅增加GC回收压力&#xff0c;并且还浪费了时间&#xff0c;创建线程是需要花时间的&…

面向过程的门面模式

{*******************************************************}{ }{ 业务逻辑一 }{ }{ 版权所有 (C) 2008 陈…

Java并发编程之线程定时器ScheduledThreadPoolExecutor解析

定时器 就是需要周期性的执行任务&#xff0c;也叫调度任务&#xff0c;在JDK中有个类Timer是支持周期性执行&#xff0c;但是这个类不建议使用了。 ScheduledThreadPoolExecutor 继承自ThreadPoolExecutor线程池&#xff0c;在Executors默认创建了两种&#xff1a; newSin…

python xml转换键值对_Python 提取dict转换为xml/json/table并输出

#!/usr/bin/python#-*- coding:gbk -*-#设置源文件输出格式import sysimport getoptimport jsonimport createDictimport myConToXMLimport myConToTabledef getRsDataToDict():#获取控制台中输入的参数&#xff0c;并根据参数找到源文件获取源数据csDict{}try:#通过getopt获取…

应用开发框架之——根据数据表中的存储的方法名称来调用方法

功用一&#xff1a;在框架里面根据存储在数据表中的方法名来动态调用执行方法。 unit Unit1; interface uses Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms, Dialogs, StdCtrls; type TForm1 class(TForm) Button1: TButton; procedu…

Spring IOC容器组件注入的几种方式

整理一下之前Spring的学习笔记&#xff0c;大致有一下几种Spring注入到容器中的方法: 1&#xff09;、配置在xml的方式。 2&#xff09;、开启包扫描ComponentScan使用Component&#xff0c;Service&#xff0c;Controller&#xff0c;Repository&#xff08;其实后三个都继承…

我们是如何拿下Google和Facebook Offer的?

http://posts.careerengine.us/p/57c3a1c1a09633ee7e57803c 大家好&#xff0c;我是小高&#xff0c;CMU CS Master&#xff0c;来Offer第一期学员&#xff0c;2014年初在孙老师的带领下我在几个月的时间内进入了Yahoo&#xff0c;并工作了近2年。2016年初&#xff0c;Yahoo工作…

Spring中BeanFactory和FactoryBean的区别

先介绍一下Spring的IOC容器到底是个什么东西&#xff0c;都说是一个控制反转的容器&#xff0c;将对象的控制权交给IOC容器&#xff0c;其实在看了源代码之后&#xff0c;就会发现IOC容器只是一个存储单例的一个ConcurrentHashMap<String, BeanDefinition> BeanDefiniti…