JUC核心

AbstractQueuedSynchronizer

一.AQS原理

1.等待队列—CLH锁

AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。

CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。

主要原理图如下:

1.1 AQS内部类 (数据结构–Node)

先来看下AQS中最基本的数据结构—Node,Node即为上面CLH变体队列中的节点。

解释一下几个方法和属性值的含义:

方法和属性值含义
waitStatus当前节点在队列中的状态
thread表示处于该节点的线程
prev前驱指针 用于处理cancelled,如果一个节点被取消,他的后继者通常会重新链接到一个未取消的前节点
predecessor返回前驱节点,没有的话抛出空指针异常
nextWaiter指向下一个处于CONDITION状态的节点(由于本篇文章不讲述Condition Queue队列,这个指针不多介绍)
next后继指针 用于实现阻塞,前驱节点遍历下一个链接确认是哪个线程来通知下一个节点唤醒,同时避免入队时的节点竞争。

两种锁模式:

模式含义
SHARED表示线程以共享的模式等待锁
EXCLUSIVE表示线程正在以独占的方式等待锁
 //指示节点在共享模式下等待的标记   static final Node SHARED = new Node();//指示节点以独占模式等待的标记static final Node EXCLUSIVE = null;

waitStatus有下面几个枚举值:

枚举含义
0当一个Node被初始化的时候的默认值或已经释放锁
CANCELLED为1,表示线程获取锁的请求已经取消了
CONDITION为-2,表示节点在等待队列中,节点线程等待唤醒
PROPAGATE为-3,当前线程处在SHARED情况下,该字段才会使用
SIGNAL为-1,表示线程已经准备好了,就等资源释放了
1.2 AQS 方法

首先我们先想到是获取锁

  • 尝试获取锁,不过有没有获取到,立即返回是否获取的标识
  • 必须获取锁,如果锁被占用,进行等待

AQS有以下两个方法对应上述两种操作分别是tryAcquireacquire,下面以独占方式获取锁为例 :

    // 该方法必须被子类重写//以独占的方式获取锁,成功返回true 失败返回falseprotected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}// final 修饰,子类不可修改。 这是独占式获取锁的主逻辑public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
tryAcquire

tryAcquire是一个被protected修饰的方法,参数是int值,代表对int state 的增加操作,返回值是boolean,代表是否成功获得锁。 在子类自己实现中,需要更新 同步状态state和当前锁的持有线程exclusiveOwnerThread
tryAcquire必须由子类实现,这样可以在获取锁的时候加上自己的业务逻辑,比如是否“可重入”等
tryAcquire返回true,线程获得锁,此时可以对相应的共享资源进行操作,使用完释放。返回false,上层逻辑上不想等待锁,可以自己进行处理。 如果想要等待锁,可以直接调用acquire方法,该方法封装了复杂的排队处理逻辑,非常易用。

acquire

acquire方法 被final修饰,不能overwrite,道哥看起来对自己的等待并获取锁 很自信哈。

if判断条件包含了两部分:
!tryAcquire(arg)
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

!tryAcquire 为false 表示获取锁成功,无需参与排队。

acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 嵌套了addWaiter(Node.EXCLUSIVE),我们先看addWaiter

addwaiter
    private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {  ---1pred.next = node;                 ---2 return node;                      ---3}}enq(node);                                ---4return node;}

顾名思义,这个方法的作用就是将当前线程封装成一个Node,然后加入等待队列,返回值即为该Node。 逻辑是新建一个Node对象,然后插入队尾。我们多线程场景下,假设存在多个线程调用addWait方法。

新建pred节点引用,指向当前的尾节点,如果尾节点不为空,进行下面三步操作:
1.将当前节点的prev指针指向pred节点(尾节点)
2.尝试通过CAS操作将当前节点置为尾节点
a. 如果返回false,说明pred节点已经不是尾节点了,其他线程修改了尾节点,退出判断,执行enq方法,准备重新入队。
b.如果返回true,CAS操作成功,pred为尾节点,CAS使得当前节点成为尾节点。那么需要将pred的next指针指向当前节点。这一步是不会存在线程安全的,因为其他节点不会操作pred节点了。

多线程环境下执行,容易迷糊的细节,也是理解该方法的重点。
1.程序执行到代码中 1 时,pred引用的对象很可能已经不是尾节点了,所以CAS失败;
2. 如果CAS成功,代码块2,3执行时不再具有原子性,但不会有线程不安全,此时pred节点和当前节点的相对位置已确定,其他线程只能在新的尾节点后插入;
3. 需要注意的是,当前后两个节点建立连接的时候,首先是后节点的pre指针指向前节点,当后节点成为尾节点后,前节点的next才会指向后节点。

如果理解了这些,什么情况下会执行到代码块4呢?

  • 队列是空的
  • 快速插入失败,需要完整的插入流程,也就是代码1处执行失败。
enq

看看enq 完整的插入流程。

private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}

逻辑是:最外层是死循环,首先判断当前队列是不是空(tail==null),先初始化 头尾指针指向同一个节点,也就是第一个节点(CLH队列需要一个虚拟头结点启动,在第一次入队时构造该节点并设置头尾指针),然后尾节点插入,失败了不断重试CAS。

现在等待队列入队完成了,怎么出队获取锁的权限呢?我们接着来看acquireQueued源码

acquireQueued
    final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}

逻辑是: 定义一个获取锁的标识符failed,默认是失败的。定义一个中断的标识符interrupted ,默认是非中断的。
死循环中逻辑:
1.predecessor方法拿到当前节点的前置节点,为空抛空指针异常;
2.当head指针指向前置节点,说明当前节点有权限去竞争锁了,这是一种约定。如果获取锁成功,将head指针指向当前节点,将前置节点的next指针置为null,利于GC回收,将获取锁的标识符置为成功,返回未中断的标识符。
3.不满足2中的条件则判断shouldParkAfterFailedAcquireparkAndCheckInterrupt,看函数名,首先判断是否挂起等待,需要就挂起,并且判断外部是否调用线程中断;如果不需要,继续尝试获取锁。
4.finally块,判断是否竞争锁失败,如果是,取消当前节点的获取锁的行为,并从队列移出。
5.返回中断的标识符,在上层acquire方法判断是否中断,来选择是否调用当前线程中断,这里属于一种延迟中断机制。

这里有三点注意:

  1. 一个约定:head节点代表当前持有锁的节点。若当前节点的前置节点是head,那么该节点就开始自选的获取锁。一旦head节点释放,当前节点就能第一时间获取到
  2. shouldParkAfterFailedAcquireparkAndCheckInterrupt 的具体细节。
  3. nterrupted变量最终被返回出去后,上层acquire判断该值,来选择是否调用线程中断。

接下来我们看看上面2中的两个方法。

shouldParkAfterFailedAcquire

该方法的作用是 没有竞争锁权限的 或者 竞争失败的节点,是否应该被挂起。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE.  Indicate that we* need a signal, but don't park yet.  Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}

逻辑是:
1.若前置节点predwaitstatus是signal,说明pred节点也在等待锁,并且之后会唤醒当前节点,所以当前节点可以挂起休息,返回true
2.如果前置节点ws>0,说明pred节点是cancelled,将其从队列移出,通过从后向前遍历,将pred指向遍历中第一个状态是非cancel的节点,相当于链式删除被cancel的节点,然后返回false,代表当前节点不需要被挂起,因为pred指向新的Node,需要重试外层逻辑。
3.除此之外ws还有两种可能,0和Propagate,为什么不可能是condition,因为waitstatus只有在其他条件模式下,才会被修改为condition,这里不会出现。 并且只有在共享模式下,才可能出现waitStatus为propagate,暂时不用管。那么在独占模式下,ws在这里只会出现 0的情况,代表pred出于初始化默认状态,通过CAS修改pred的状态为signal,然后返回false,重试外层逻辑。

这个方法涉及到对Node的waitstatus修改,相比比较关键。

如果shouldParkAfterFailedAcquire返回false,进行下一轮重试;如果返回true,代表当前节点需要被挂起,则执行parkAndCheckInterrupt方法。

parkAndCheckInterrupt
     private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}

LockSupport.park(this)本质时通过Unsafe下的native方法调用操作系统原语将当前线程挂起。
此时当前Node中的线程将阻塞在此处,知道持有锁的线程调用release方法,唤醒后续节点。

Thread.interrupted() 有什么作用呢? 这是因为在线程挂起期间,线程可能被中断,park期间无法响应中断,所以只有当线程唤醒时,检查park期间是否被调用中断,有的话,传递出去,通过外层accquire 来响应中断。

总结以下,通过accquireQueued这个方法,我们可以明白,如果当前线程所在的系欸但处于头节点后一个,那么它将不断尝试自旋拿锁,直到成功,否则进行判断,是否需要挂起。这样就能保证head之后的一个节点在自旋CAS获取锁,其他线程在挂起或正在被挂起。这样最大限度避免无用的自旋消耗CPU。

上面我们讲到 大量线程被挂起,那么就会有被唤醒的时机。也提到,当持有所得线程释放了锁,会唤醒后续节点。 我们来看看 release方法。

release
    public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}

tryAcquire一样,tryRelease也是AQS开放给上层自由实现的抽象方法。
release中,假如尝试释放锁成功,下一步就要唤醒等待队列的其他节点,这里我们重点看下unparkSucessor这个方法。参数是head Node。

unparkSucessor
    private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling.  It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node.  But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}

首先拿到head节点的waitStatus,如果不为0,将其置为0,表示锁释放。接下来后续指针为null,或者处于canceled状态,那么从后向前遍历,找到除了head的最靠前的非canceled状态的node,对其unpark,让其尝试拿锁。

这里注意为什么要从尾节点向前遍历,而不是从前往后?

  1. 因为我们在addWaiter方法中,是先将后节点的prev指针指向前节点;然后原子操作将尾指针指向新插入的节点。然后才将前节点,next指针指向当前节点 操作不是原子性的。
  2. 新插入的节点是先感知到前节点的,如果从前往后遍历,可能前节点的next 还为建立好,遍历就结束了。

这时,拿锁 、挂起 、释放、唤醒 都能够有条不紊,且高效的运行。

2.同步状态—state

同步状态State来控制整体可重入的情况。State是Volatile修饰的,用于保证一定的可见性和有序性。

 /*** The synchronization state.*/
private volatile int state;

State这个字段主要的过程:

  1. State初始化的时候为0,表示没有任何线程持有锁。
  2. 当有线程持有该锁时,值就会在原来的基础上+1,同一个线程多次获得锁是,就会多次+1,这里就是可重入的概念。
  3. 解锁也是对这个字段-1,一直到0,此线程对锁释放。
  4. 共享锁是初始化state为n,每次获取锁-1,直到减到0,其他线程阻塞。

下面提供了几个访问这个字段的方法:

方法名描述
protected final int getState()获取State的值
protected final void setState(int newState)设置State的值
protected final boolean compareAndSetState(int expect, int update)使用CAS方式更新State

这几个方法都是Final修饰的,说明子类中无法重写它们。我们可以通过修改State字段表示的同步状态来实现多线程的独占模式共享模式(加锁过程)。


对于我们自定义的同步工具,需要自定义获取同步状态释放状态的方式,也就是AQS架构图中的第一层:API层。

二.AQS在JUC中应用

1.ReentrantLock-可重入锁

什么是ReentrantLock?
ReentrantLock基于AQS,在并发编程中,它可以实现公平锁非公平锁来对共享资源进行同步,同时,和synchronized一样,ReentrantLock支持可重入,除此之外,ReentrantLock调度上更灵活,支持更多丰富的功能。

源码分析:

public class ReentrantLock implements Lock, java.io.Serializable {}

ReentrantLock 实现了lock接口,lock接口的功能如下:

Lock的意义在于提供了区别于synchronized的另一种具有更多广泛操作的同步方式,它能支持更多灵活的结构,并且可以关联多个Condition对象,Condition是Java提供的一个用于线程通信的接口。
Lock有如下6个方法:

  • void lock(),顾名思义就是用来获取锁,假如当前锁被其他线程占用,那么将会等待直到获取为止
  • void lockInterruptibly(),和lock类似,也是用来获取锁,但区别在于,假如当前线程在等待锁的过程中中断了,那么将会退出等待,并抛出异常
  • boolean tryLock(),尝试获取锁,无论是否成功获取都立即返回,返回值代表是否获取锁。
  • boolean trylock(long time,TimeUnit unit),尝试获取锁并设定了等待超时时间,返回值代表是否获取锁。
  • void unlock(),释放锁。
  • Condition newCondition(),新建一个绑定在当前lock对象上的Condition对象。

-Condition对象是什么? 简单来说,他表示一个条件,不同线程可以通过该条件来进行通信。比如某个线程可以通过该对象的await方法注册在condition对象进行等待,然后通过condition对象的signal方法将该线程唤醒,这有点类似Object锁的wait和notify方法。但不同的是,一个lock对象可以关联多个Condition对象,多个线程可以被绑定在不同的Condition对象上,这样就可以分组等待唤醒,此外,Condition对象还提供了和限时、中断相关的功能,丰富了线程的调度策略。

ReentrantLock ,核心关注三个方面:

  • 属性:sync
  • 内部类:Sync、NonFairSync、FairSync
  • 方法:
    继承、实现方法: 实现Lock的方法
    私有方法:暂不关注
1.1 属性

ReentrantLock 只有一个属性,Sync类型的变量sync。sync被final修饰,意味着一旦初始化,就不可修改。在构造函数中初始化。

private final Sync sync;//构造函数
public ReentrantLock() {sync = new NonfairSync();}public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}

有参构造,通过传入boolean值 指定初始化为公平锁还是非公平锁。

1.2 内部类

Sync
Sync 继承AQS,除了lock方法和readObject方法外,其他都是final修饰的,意味着子类不可修改,说明这些对AQS内部方法的拓展实现,已经完整可靠,不希望被外部破坏。

   //继承AQSabstract static class Sync extends AbstractQueuedSynchronizer {//抽象,由子类去实现abstract void lock();//不公平模式获取锁final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}//可重入锁的释放流程protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())// 当前线程不是持有锁的线程,抛出异常throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}//当前线程是否独占式持有锁protected final boolean isHeldExclusively() {// While we must in general read state before owner,// we don't need to do so to check if current thread is ownerreturn getExclusiveOwnerThread() == Thread.currentThread();}//锁实例化Condition对象final ConditionObject newCondition() {return new ConditionObject();}// 当前锁的持有线程final Thread getOwner() {return getState() == 0 ? null : getExclusiveOwnerThread();}//当前线程 持有锁的重入数final int getHoldCount() {return isHeldExclusively() ? getState() : 0;}//当前锁是否被任何线程持有final boolean isLocked() {return getState() != 0;}//用于反序列化private void readObject(java.io.ObjectInputStream s)throws java.io.IOException, ClassNotFoundException {s.defaultReadObject();setState(0); // reset to unlocked state}}
  • nonfairTryAcquire,该方法 逻辑比较简单:
    1.获取state,该值由AQS维护。
    2.当state=0,代表锁状态为空闲,便可以进行一次CAS来原子的更改state,state更改成功,代表获取了锁,将当前线程设置为独占线程,并返回true,否则返回false。
    3. 当state!=0,说明锁被占用,判断当前线程是否已经是独占线程,是的话,就可"重入获取锁",state+1,否则,返回false。
    这里有个细节,判断不为负数? 这是因为 int为16位,表示最大的有符号为2147483647,一旦超出便会溢出变为负数,所以我们可以这样理解,ReentrantLock允许的最大次数为214783647。

  • tryRelease, 释放锁的通用操作,这里返回的boolean类型值,不是代表是否释放成功,而是代表是否完全释放(因为存在重入,所以需要多次释放)。

公平锁和非公平锁
FIFO代表着公平锁,比如AQS中介绍的FIFO队列,实现的就是公平锁。非公平锁就是锁的分配不用按照请求锁的顺序,比如是抢占式的。
公平锁保证了只要你排队,一定可以获取锁,抢占式的非公平锁,很可能某个线程一直抢不到锁,又不断有新的线程加入进来抢锁,线程一直处于阻塞状态,这种状态称为线程饥饿

非公平锁的设计意义: 非公平锁意味着请求锁的线程可能在前面的休眠线程恢复前拿到锁,这样就有可能提高饼发的性能。 当唤醒挂起线程是,线程状态切换之间会产生短暂延时。 非公平锁就可以利用这段时间完成操作。这是非公平锁某些时候比公平锁性能好的原因。

我们来看下Sync的两种实现 NonFairSyncFairSync

NonFairSync

    static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock.  Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}

重写了 sync 的 lock和 AQS的tryAcquire方法,

  • lock 方法 先尝试一次获取锁,CAS成功,当前线程获取锁,失败,调用AQS的acquire方法。

这里两个问题值得讨论:
1.可重入性
当程序调用acqiue,先调用tryAcquire来获取锁,而nonfairTryacquire内部实现了可重入性,所以满足
2.非公平性
当程序调用lock的时候,会进行一次CAS,CAS失败,调用acquire,在acquire中,先调用一次tryAcquire,而nonfairTryAcquire会尝试先获取锁,如果锁被占用且不可重入,那么就会继续执行AQS后续的排队流程,虽然只有两次尝试抢占,但也体现出非公平性。

FairSync

    static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}/*** Fair version of tryAcquire.  Don't grant access unless* recursive call or no waiters or is first.*/protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}

重写了 sync 的 lock和 AQS的tryAcquire方法,

  • lock直接调用AQS的acquire

    1.可重入性
    调用acquire的时候,会首先调用一次tryAcquire,在tryAcquire方法内部,我们会看到锁被占用时候,将会进行可重入判断,和nonfairTryAcquire中类似。
    2.公平性
    tryAcquire中,首先判断锁是否空闲,空闲,此时不是直接CAS获取锁,而是需要判断是否存在前置等待节点。 如果不存在,说明在队列中确实已经轮到当前线程尝试获取锁,否则,执行AQS的后续等待逻辑,这里体现出了公平性。

1.3 方法

lock
该方法只是对sync对象的lock方法调用。在FairSync和NonfairSync 有具体的实现。

    public void lock() {sync.lock();}

lockInterruptibly
该方法和lock方法的区别在于,当线程在等待锁的期间,是否立即响应中断。lock方法,线程会在等待获取锁之后,再响应中断,这点在AQS中实现了。 lockInterruptibly方法中,若线程在等待获取锁期间被调用了中断,那么将会立即抛出中断异常。调用了sync的acquireInterruptibly,该方法的实现存在于AQS内部。

    public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}

tryLock
直接调用sync对象的nonfairTryAcquire方法,无论ReentrantLock被指定为公平还是非公平,tryLock都是非公平的,这样设计是合理的。也是把nonfairTryAcquire放在Sync的原因。

引申:

Java AQS 核心数据结构-CLH 锁
从ReentrantLock的实现看AQS的原理及应用

JUC核心

AbstractQueuedSynchronizer

一.AQS原理

1.等待队列—CLH锁

AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。

CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。

主要原理图如下:

1.1 AQS内部类 (数据结构–Node)

先来看下AQS中最基本的数据结构—Node,Node即为上面CLH变体队列中的节点。

解释一下几个方法和属性值的含义:

方法和属性值含义
waitStatus当前节点在队列中的状态
thread表示处于该节点的线程
prev前驱指针 用于处理cancelled,如果一个节点被取消,他的后继者通常会重新链接到一个未取消的前节点
predecessor返回前驱节点,没有的话抛出空指针异常
nextWaiter指向下一个处于CONDITION状态的节点(由于本篇文章不讲述Condition Queue队列,这个指针不多介绍)
next后继指针 用于实现阻塞,前驱节点遍历下一个链接确认是哪个线程来通知下一个节点唤醒,同时避免入队时的节点竞争。

两种锁模式:

模式含义
SHARED表示线程以共享的模式等待锁
EXCLUSIVE表示线程正在以独占的方式等待锁
 //指示节点在共享模式下等待的标记   static final Node SHARED = new Node();//指示节点以独占模式等待的标记static final Node EXCLUSIVE = null;

waitStatus有下面几个枚举值:

枚举含义
0当一个Node被初始化的时候的默认值或已经释放锁
CANCELLED为1,表示线程获取锁的请求已经取消了
CONDITION为-2,表示节点在等待队列中,节点线程等待唤醒
PROPAGATE为-3,当前线程处在SHARED情况下,该字段才会使用
SIGNAL为-1,表示线程已经准备好了,就等资源释放了
1.2 AQS 方法

首先我们先想到是获取锁

  • 尝试获取锁,不过有没有获取到,立即返回是否获取的标识
  • 必须获取锁,如果锁被占用,进行等待

AQS有以下两个方法对应上述两种操作分别是tryAcquireacquire,下面以独占方式获取锁为例 :

    // 该方法必须被子类重写//以独占的方式获取锁,成功返回true 失败返回falseprotected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}// final 修饰,子类不可修改。 这是独占式获取锁的主逻辑public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
tryAcquire

tryAcquire是一个被protected修饰的方法,参数是int值,代表对int state 的增加操作,返回值是boolean,代表是否成功获得锁。 在子类自己实现中,需要更新 同步状态state和当前锁的持有线程exclusiveOwnerThread
tryAcquire必须由子类实现,这样可以在获取锁的时候加上自己的业务逻辑,比如是否“可重入”等
tryAcquire返回true,线程获得锁,此时可以对相应的共享资源进行操作,使用完释放。返回false,上层逻辑上不想等待锁,可以自己进行处理。 如果想要等待锁,可以直接调用acquire方法,该方法封装了复杂的排队处理逻辑,非常易用。

acquire

acquire方法 被final修饰,不能overwrite,道哥看起来对自己的等待并获取锁 很自信哈。

if判断条件包含了两部分:
!tryAcquire(arg)
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

!tryAcquire 为false 表示获取锁成功,无需参与排队。

acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 嵌套了addWaiter(Node.EXCLUSIVE),我们先看addWaiter

addwaiter
    private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {  ---1pred.next = node;                 ---2 return node;                      ---3}}enq(node);                                ---4return node;}

顾名思义,这个方法的作用就是将当前线程封装成一个Node,然后加入等待队列,返回值即为该Node。 逻辑是新建一个Node对象,然后插入队尾。我们多线程场景下,假设存在多个线程调用addWait方法。

新建pred节点引用,指向当前的尾节点,如果尾节点不为空,进行下面三步操作:
1.将当前节点的prev指针指向pred节点(尾节点)
2.尝试通过CAS操作将当前节点置为尾节点
a. 如果返回false,说明pred节点已经不是尾节点了,其他线程修改了尾节点,退出判断,执行enq方法,准备重新入队。
b.如果返回true,CAS操作成功,pred为尾节点,CAS使得当前节点成为尾节点。那么需要将pred的next指针指向当前节点。这一步是不会存在线程安全的,因为其他节点不会操作pred节点了。

多线程环境下执行,容易迷糊的细节,也是理解该方法的重点。
1.程序执行到代码中 1 时,pred引用的对象很可能已经不是尾节点了,所以CAS失败;
2. 如果CAS成功,代码块2,3执行时不再具有原子性,但不会有线程不安全,此时pred节点和当前节点的相对位置已确定,其他线程只能在新的尾节点后插入;
3. 需要注意的是,当前后两个节点建立连接的时候,首先是后节点的pre指针指向前节点,当后节点成为尾节点后,前节点的next才会指向后节点。

如果理解了这些,什么情况下会执行到代码块4呢?

  • 队列是空的
  • 快速插入失败,需要完整的插入流程,也就是代码1处执行失败。
enq

看看enq 完整的插入流程。

private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}

逻辑是:最外层是死循环,首先判断当前队列是不是空(tail==null),先初始化 头尾指针指向同一个节点,也就是第一个节点(CLH队列需要一个虚拟头结点启动,在第一次入队时构造该节点并设置头尾指针),然后尾节点插入,失败了不断重试CAS。

现在等待队列入队完成了,怎么出队获取锁的权限呢?我们接着来看acquireQueued源码

acquireQueued
    final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}

逻辑是: 定义一个获取锁的标识符failed,默认是失败的。定义一个中断的标识符interrupted ,默认是非中断的。
死循环中逻辑:
1.predecessor方法拿到当前节点的前置节点,为空抛空指针异常;
2.当head指针指向前置节点,说明当前节点有权限去竞争锁了,这是一种约定。如果获取锁成功,将head指针指向当前节点,将前置节点的next指针置为null,利于GC回收,将获取锁的标识符置为成功,返回未中断的标识符。
3.不满足2中的条件则判断shouldParkAfterFailedAcquireparkAndCheckInterrupt,看函数名,首先判断是否挂起等待,需要就挂起,并且判断外部是否调用线程中断;如果不需要,继续尝试获取锁。
4.finally块,判断是否竞争锁失败,如果是,取消当前节点的获取锁的行为,并从队列移出。
5.返回中断的标识符,在上层acquire方法判断是否中断,来选择是否调用当前线程中断,这里属于一种延迟中断机制。

这里有三点注意:

  1. 一个约定:head节点代表当前持有锁的节点。若当前节点的前置节点是head,那么该节点就开始自选的获取锁。一旦head节点释放,当前节点就能第一时间获取到
  2. shouldParkAfterFailedAcquireparkAndCheckInterrupt 的具体细节。
  3. nterrupted变量最终被返回出去后,上层acquire判断该值,来选择是否调用线程中断。

接下来我们看看上面2中的两个方法。

shouldParkAfterFailedAcquire

该方法的作用是 没有竞争锁权限的 或者 竞争失败的节点,是否应该被挂起。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE.  Indicate that we* need a signal, but don't park yet.  Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}

逻辑是:
1.若前置节点predwaitstatus是signal,说明pred节点也在等待锁,并且之后会唤醒当前节点,所以当前节点可以挂起休息,返回true
2.如果前置节点ws>0,说明pred节点是cancelled,将其从队列移出,通过从后向前遍历,将pred指向遍历中第一个状态是非cancel的节点,相当于链式删除被cancel的节点,然后返回false,代表当前节点不需要被挂起,因为pred指向新的Node,需要重试外层逻辑。
3.除此之外ws还有两种可能,0和Propagate,为什么不可能是condition,因为waitstatus只有在其他条件模式下,才会被修改为condition,这里不会出现。 并且只有在共享模式下,才可能出现waitStatus为propagate,暂时不用管。那么在独占模式下,ws在这里只会出现 0的情况,代表pred出于初始化默认状态,通过CAS修改pred的状态为signal,然后返回false,重试外层逻辑。

这个方法涉及到对Node的waitstatus修改,相比比较关键。

如果shouldParkAfterFailedAcquire返回false,进行下一轮重试;如果返回true,代表当前节点需要被挂起,则执行parkAndCheckInterrupt方法。

parkAndCheckInterrupt
     private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}

LockSupport.park(this)本质时通过Unsafe下的native方法调用操作系统原语将当前线程挂起。
此时当前Node中的线程将阻塞在此处,知道持有锁的线程调用release方法,唤醒后续节点。

Thread.interrupted() 有什么作用呢? 这是因为在线程挂起期间,线程可能被中断,park期间无法响应中断,所以只有当线程唤醒时,检查park期间是否被调用中断,有的话,传递出去,通过外层accquire 来响应中断。

总结以下,通过accquireQueued这个方法,我们可以明白,如果当前线程所在的系欸但处于头节点后一个,那么它将不断尝试自旋拿锁,直到成功,否则进行判断,是否需要挂起。这样就能保证head之后的一个节点在自旋CAS获取锁,其他线程在挂起或正在被挂起。这样最大限度避免无用的自旋消耗CPU。

上面我们讲到 大量线程被挂起,那么就会有被唤醒的时机。也提到,当持有所得线程释放了锁,会唤醒后续节点。 我们来看看 release方法。

release
    public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}

tryAcquire一样,tryRelease也是AQS开放给上层自由实现的抽象方法。
release中,假如尝试释放锁成功,下一步就要唤醒等待队列的其他节点,这里我们重点看下unparkSucessor这个方法。参数是head Node。

unparkSucessor
    private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling.  It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node.  But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}

首先拿到head节点的waitStatus,如果不为0,将其置为0,表示锁释放。接下来后续指针为null,或者处于canceled状态,那么从后向前遍历,找到除了head的最靠前的非canceled状态的node,对其unpark,让其尝试拿锁。

这里注意为什么要从尾节点向前遍历,而不是从前往后?

  1. 因为我们在addWaiter方法中,是先将后节点的prev指针指向前节点;然后原子操作将尾指针指向新插入的节点。然后才将前节点,next指针指向当前节点 操作不是原子性的。
  2. 新插入的节点是先感知到前节点的,如果从前往后遍历,可能前节点的next 还为建立好,遍历就结束了。

这时,拿锁 、挂起 、释放、唤醒 都能够有条不紊,且高效的运行。

2.同步状态—state

同步状态State来控制整体可重入的情况。State是Volatile修饰的,用于保证一定的可见性和有序性。

 /*** The synchronization state.*/
private volatile int state;

State这个字段主要的过程:

  1. State初始化的时候为0,表示没有任何线程持有锁。
  2. 当有线程持有该锁时,值就会在原来的基础上+1,同一个线程多次获得锁是,就会多次+1,这里就是可重入的概念。
  3. 解锁也是对这个字段-1,一直到0,此线程对锁释放。
  4. 共享锁是初始化state为n,每次获取锁-1,直到减到0,其他线程阻塞。

下面提供了几个访问这个字段的方法:

方法名描述
protected final int getState()获取State的值
protected final void setState(int newState)设置State的值
protected final boolean compareAndSetState(int expect, int update)使用CAS方式更新State

这几个方法都是Final修饰的,说明子类中无法重写它们。我们可以通过修改State字段表示的同步状态来实现多线程的独占模式共享模式(加锁过程)。


对于我们自定义的同步工具,需要自定义获取同步状态释放状态的方式,也就是AQS架构图中的第一层:API层。

二.AQS在JUC中应用

1.ReentrantLock-可重入锁

什么是ReentrantLock?
ReentrantLock基于AQS,在并发编程中,它可以实现公平锁非公平锁来对共享资源进行同步,同时,和synchronized一样,ReentrantLock支持可重入,除此之外,ReentrantLock调度上更灵活,支持更多丰富的功能。

源码分析:

public class ReentrantLock implements Lock, java.io.Serializable {}

ReentrantLock 实现了lock接口,lock接口的功能如下:

Lock的意义在于提供了区别于synchronized的另一种具有更多广泛操作的同步方式,它能支持更多灵活的结构,并且可以关联多个Condition对象,Condition是Java提供的一个用于线程通信的接口。
Lock有如下6个方法:

  • void lock(),顾名思义就是用来获取锁,假如当前锁被其他线程占用,那么将会等待直到获取为止
  • void lockInterruptibly(),和lock类似,也是用来获取锁,但区别在于,假如当前线程在等待锁的过程中中断了,那么将会退出等待,并抛出异常
  • boolean tryLock(),尝试获取锁,无论是否成功获取都立即返回,返回值代表是否获取锁。
  • boolean trylock(long time,TimeUnit unit),尝试获取锁并设定了等待超时时间,返回值代表是否获取锁。
  • void unlock(),释放锁。
  • Condition newCondition(),新建一个绑定在当前lock对象上的Condition对象。

-Condition对象是什么? 简单来说,他表示一个条件,不同线程可以通过该条件来进行通信。比如某个线程可以通过该对象的await方法注册在condition对象进行等待,然后通过condition对象的signal方法将该线程唤醒,这有点类似Object锁的wait和notify方法。但不同的是,一个lock对象可以关联多个Condition对象,多个线程可以被绑定在不同的Condition对象上,这样就可以分组等待唤醒,此外,Condition对象还提供了和限时、中断相关的功能,丰富了线程的调度策略。

ReentrantLock ,核心关注三个方面:

  • 属性:sync
  • 内部类:Sync、NonFairSync、FairSync
  • 方法:
    继承、实现方法: 实现Lock的方法
    私有方法:暂不关注
1.1 属性

ReentrantLock 只有一个属性,Sync类型的变量sync。sync被final修饰,意味着一旦初始化,就不可修改。在构造函数中初始化。

private final Sync sync;//构造函数
public ReentrantLock() {sync = new NonfairSync();}public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}

有参构造,通过传入boolean值 指定初始化为公平锁还是非公平锁。

1.2 内部类

Sync
Sync 继承AQS,除了lock方法和readObject方法外,其他都是final修饰的,意味着子类不可修改,说明这些对AQS内部方法的拓展实现,已经完整可靠,不希望被外部破坏。

   //继承AQSabstract static class Sync extends AbstractQueuedSynchronizer {//抽象,由子类去实现abstract void lock();//不公平模式获取锁final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}//可重入锁的释放流程protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())// 当前线程不是持有锁的线程,抛出异常throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}//当前线程是否独占式持有锁protected final boolean isHeldExclusively() {// While we must in general read state before owner,// we don't need to do so to check if current thread is ownerreturn getExclusiveOwnerThread() == Thread.currentThread();}//锁实例化Condition对象final ConditionObject newCondition() {return new ConditionObject();}// 当前锁的持有线程final Thread getOwner() {return getState() == 0 ? null : getExclusiveOwnerThread();}//当前线程 持有锁的重入数final int getHoldCount() {return isHeldExclusively() ? getState() : 0;}//当前锁是否被任何线程持有final boolean isLocked() {return getState() != 0;}//用于反序列化private void readObject(java.io.ObjectInputStream s)throws java.io.IOException, ClassNotFoundException {s.defaultReadObject();setState(0); // reset to unlocked state}}
  • nonfairTryAcquire,该方法 逻辑比较简单:
    1.获取state,该值由AQS维护。
    2.当state=0,代表锁状态为空闲,便可以进行一次CAS来原子的更改state,state更改成功,代表获取了锁,将当前线程设置为独占线程,并返回true,否则返回false。
    3. 当state!=0,说明锁被占用,判断当前线程是否已经是独占线程,是的话,就可"重入获取锁",state+1,否则,返回false。
    这里有个细节,判断不为负数? 这是因为 int为16位,表示最大的有符号为2147483647,一旦超出便会溢出变为负数,所以我们可以这样理解,ReentrantLock允许的最大次数为214783647。

  • tryRelease, 释放锁的通用操作,这里返回的boolean类型值,不是代表是否释放成功,而是代表是否完全释放(因为存在重入,所以需要多次释放)。

公平锁和非公平锁
FIFO代表着公平锁,比如AQS中介绍的FIFO队列,实现的就是公平锁。非公平锁就是锁的分配不用按照请求锁的顺序,比如是抢占式的。
公平锁保证了只要你排队,一定可以获取锁,抢占式的非公平锁,很可能某个线程一直抢不到锁,又不断有新的线程加入进来抢锁,线程一直处于阻塞状态,这种状态称为线程饥饿

非公平锁的设计意义: 非公平锁意味着请求锁的线程可能在前面的休眠线程恢复前拿到锁,这样就有可能提高饼发的性能。 当唤醒挂起线程是,线程状态切换之间会产生短暂延时。 非公平锁就可以利用这段时间完成操作。这是非公平锁某些时候比公平锁性能好的原因。

我们来看下Sync的两种实现 NonFairSyncFairSync

NonFairSync

    static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock.  Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}

重写了 sync 的 lock和 AQS的tryAcquire方法,

  • lock 方法 先尝试一次获取锁,CAS成功,当前线程获取锁,失败,调用AQS的acquire方法。

这里两个问题值得讨论:
1.可重入性
当程序调用acqiue,先调用tryAcquire来获取锁,而nonfairTryacquire内部实现了可重入性,所以满足
2.非公平性
当程序调用lock的时候,会进行一次CAS,CAS失败,调用acquire,在acquire中,先调用一次tryAcquire,而nonfairTryAcquire会尝试先获取锁,如果锁被占用且不可重入,那么就会继续执行AQS后续的排队流程,虽然只有两次尝试抢占,但也体现出非公平性。

FairSync

    static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}/*** Fair version of tryAcquire.  Don't grant access unless* recursive call or no waiters or is first.*/protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}

重写了 sync 的 lock和 AQS的tryAcquire方法,

  • lock直接调用AQS的acquire

    1.可重入性
    调用acquire的时候,会首先调用一次tryAcquire,在tryAcquire方法内部,我们会看到锁被占用时候,将会进行可重入判断,和nonfairTryAcquire中类似。
    2.公平性
    tryAcquire中,首先判断锁是否空闲,空闲,此时不是直接CAS获取锁,而是需要判断是否存在前置等待节点。 如果不存在,说明在队列中确实已经轮到当前线程尝试获取锁,否则,执行AQS的后续等待逻辑,这里体现出了公平性。

1.3 方法

lock
该方法只是对sync对象的lock方法调用。在FairSync和NonfairSync 有具体的实现。

    public void lock() {sync.lock();}

lockInterruptibly
该方法和lock方法的区别在于,当线程在等待锁的期间,是否立即响应中断。lock方法,线程会在等待获取锁之后,再响应中断,这点在AQS中实现了。 lockInterruptibly方法中,若线程在等待获取锁期间被调用了中断,那么将会立即抛出中断异常。调用了sync的acquireInterruptibly,该方法的实现存在于AQS内部。

    public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}

tryLock
直接调用sync对象的nonfairTryAcquire方法,无论ReentrantLock被指定为公平还是非公平,tryLock都是非公平的,这样设计是合理的。也是把nonfairTryAcquire放在Sync的原因。

引申:

Java AQS 核心数据结构-CLH 锁
从ReentrantLock的实现看AQS的原理及应用