Java锁机制AQS分析总结

/ 默认分类 / 1 条评论 / 2177浏览

Java锁机制AQS分析总结(一)

一.前言

前面介绍了synchronized锁机制,就是使用进出监视器实现排斥作用,并且之后进行了锁优化,提升了synchronized的性能。 synchronized就是通过设置对象头的markword,适应不同的锁竞争情况,使用不同的锁状态,只有最终重量级锁才会调用操作系统原子指令 mutex(进出监视器指令,阻塞挂起等待的线程)。

java中还提供了可以自己实现并发线程调度的锁机制,之前我有介绍LockSupport,这个类基于Unsafe实现的,提供了线程直接调度的方法,包括挂起和唤醒等操作, java并发包中实现的锁机制都是基于AQS(AbstractQueuedSynchronizer 抽象同步队列)实现的。

AQS是一个FIFO的双向队列,其内部通过节点head和tail记录队首和队尾元素,队列元素的类型为Node,其中Node中的thread变量用来存放进入AQS队列里面的线程; Node节点内部的SHARED用来 标记该线程是获取共享资源时被阻塞挂起后放入AQS队列的, EXCLUSIVE用来标记线程是获取独占资源时被挂起后放入. AQS队列的; waitStatus记录当前线程等待状态,可 以为CANCELLED (线程被取消了)、SIGNAL (线程需要被唤醒)、CONDITION (线程在条件队列里面等待)、PROPAGATE (释放共享资源时需要通知其他节点) ; prev记录当前节点的前驱节点, next记录当前节点的后继节点。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer

AQS(AbstractQueuedSynchronizer)中有下面三个主要成员变量:

    //同步阻塞队双向队列头部节点
    private transient volatile Node head;
    
    //同步阻塞队双向队列尾部节点
    private transient volatile Node tail;

    //同步状态,不同的锁的实现就是操作state变量来设计的  
    private volatile int state;

ps: 前面两个变量就是存放当前争夺该锁的线程数据的双向队列,后面的state变量就是表示当前锁状态的重要值,线程操作修改state来表示当前锁和线程的关系。

图解AQS简约模型

图解AQS简约模型

同步阻塞双向队列中封装竞争锁的线程的Node节点对象:

static final class Node {
            //获取锁的模式
            static final Node SHARED = new Node();
            static final Node EXCLUSIVE = null;
    
            //该节点线程当前出于什么状态  
            static final int CANCELLED =  1;
            static final int SIGNAL    = -1;
            static final int CONDITION = -2;
            static final int PROPAGATE = -3;
            volatile int waitStatus;//就是上面几个值
    
            //双向循环链表
            volatile Node prev;
            volatile Node next;
            
            //封装在节点中的线程  
            volatile Thread thread;
            //
            Node nextWaiter;

图解同步阻塞队列节点
图解同步阻塞队列节点

下面介绍AQS实现独占锁的方式,主要依赖AQS中下面几个方法:

ps:AQS实现锁机制的大概逻辑(下面会详细介绍):当一个线程调用独占锁的acquire方法,则表示该线程想获取独占锁,方法里面会调用tryAcquire方法,这个方法在aqs中没有实现,需要aqs的子类实现,主要就是 通过设置state值来表示当前锁的状态,比如aqs的子类ReentrantLock就实现了tryAcquire方法,该方法中设置state的值,如果设置成功就返回,失败就将当前队列 封装未Node节点放入该锁的同步队列中,并调用LockSupport的park将自己挂起。

下面先来探讨下Reenterlock独占锁的详细实现原理

lock()加锁操作

        final void lock() {
            acquire(1);
        }
  1. 加锁成功则结束
  2. 加锁失败,则封装当前节点并入同步阻塞队列,入队后再尝试获取锁(直到获取锁成功,后面会解释)
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire是加锁的操作,从上面的过程图可以看出,在ReenterLock中有两种实现,分别是公平锁和非公平锁.

protected final boolean tryAcquire(int acquires) {
            //获取当前调用的线程,即当前视图加锁的线程
            final Thread current = Thread.currentThread();
            //获取当前独占锁的状态值state
            int c = getState();
            //如果独占锁状态是0,则表示当前锁没有属于任何线程,可以来获取
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果当前线程就是持有锁的线程,锁状态加1然后返回true,可重入性
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

Q:这里来解释一个问题,为什么判断了if(c == 0)之后,在逻辑里面还调用hasQueuedPredecessors()?

先来看下hasQueuePredecessors的源码

    public final boolean hasQueuedPredecessors() {
        Node t = tail; 
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

hasQueuedPredecessors()方法是判断当前同步阻塞队列有没有其他线程先一步也在获取锁,并且当时它获取state的时候也是0,但是此时这个抢先一步的 线程已经作为节点插入了同步阻塞队列,那么就不需要进行cas修改state,直接进入阻塞队列即可
如果有说明那个先一步的线程已经初始化了阻塞队列,那么此时当前队列加锁失败,tryAcquire返回false,如果没有其他线程 先一步,就表示不要排队,那么当前线程的确就是第一个来获取锁的,那么直接cas获取锁并设置为所属线程,tryAcquire返回true

下面看下,获取锁失败后入队的操作:

    private Node addWaiter(Node mode) {
        //将当前线程封装为阻塞队列的Node节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        //获取尾节点
        Node pred = tail;
        //如果尾节点不为null,则说明当前的阻塞队列已经有阻塞节点了,那么直接放入尾节点之后
        if (pred != null) {
            //tail(旧尾) <-- currentNode
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                //tail(旧尾) --> currentNode  
                pred.next = node;
                return node;
            }
        }
        //如果pred是null,这说明当前阻塞队列可能是第一次初始化,阻塞队列是空的,当前节点作为头尾节点
        enq(node);
        return node;
    }
    private Node enq(final Node node) {
    //这里使用cas自旋操作,放在循环中,因为可能目前已经不满足之前的阻塞队列为空的条件了.
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                //执行到这里说明,当前同步阻塞双向队列是空的,进行初始化,并且可以看出,同步阻塞双向队列的头元素是没有保存线程
                //数据的,初始化的时候设置的是一个空Node对象,head只保存了下一个节点的指针
                if (compareAndSetHead(new Node()))
                    //这样,下面再次执行for循环,tail就不是空了
                    tail = head;
            } else {
                //先: tail(即将成为旧的尾节点的尾节点) <-- currentNode
                node.prev = t;
                //后: cas设置当前准备添加的同步阻塞双向队列的节点为tail
                //cas成功,则tail(旧尾节点了) --> currentNode(当前操作的节点,并且已经是新的tail)
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

至此,入队操作结束,并且addWaiter方法返回当前入队的节点,之后调用acquireQueued(Node node)
从名字可以看出,该方法的意思是入队后再次获取锁.下面分析下源码:

//这个方法可以理解为入队列后,尝试获取锁
    final boolean acquireQueued(final Node node, int arg) {
        //默认假设加锁就是失败了
        boolean failed = true;
        try {
            //设置当前线程的中断标识为false
            boolean interrupted = false;
            //当前节点线程会一直自旋,直到获取锁成功,期间可能会被park
            for (;;) {
                //获取当前节点的前节点
                final Node p = node.predecessor();
                //如果当前节点的前节点就是head,那么说明当前节点就是第同步阻塞队列的排在第一的线程节点
                // 那么就尝试获取锁
                if (p == head && tryAcquire(arg)) {
                    //如果获取锁成功那么就会执行这个if,设置同步阻塞队列的头为当前节点(同步阻塞队列的头结点没有保存阻塞线程的作用),只是保存了下一个节点的状态,这里需要参考CLH
                    //所以这里设置这个已经从同步阻塞队列中获取了锁的节点,它等于已经出队列了,也就等于将之前的头结点去掉了
                    setHead(node);
                    p.next = null; // help GC
                    //设置加锁失败标志为false
                    failed = false;
                    //加锁成功后,返回中断标志(可能是中断状态,因为下面会修改)
                    return interrupted;
                }
                //如果获取锁失败,判断当前是否需要被park(其实就是判断当前节点的前一个节点的waitStatus是否被设置为SIGNAL了)
                if (shouldParkAfterFailedAcquire(p, node) &&//如果需要park,则调用park
                    parkAndCheckInterrupt())
                    //设置中断标志为true
                    interrupted = true;
            }
        } finally {
            //最终,如果自旋加锁失败了,就废弃当前节点
            if (failed)
                cancelAcquire(node);
        }
    }
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

Q:为什么acquireQueued()方法返回中断标志,并且返回中断标志后,在acquire()方法中还调用一次中断?

这个问题我有在另一篇文章中介绍,参考跳转文章

再来看下shouldParkAfterFailedAcquire()
该方法就是每次加锁失败后判断是否需要park阻塞挂起当前线程节点,这和线程节点状态相关(可能是有其他线程调用了条件队列相关操作修改了状态,那么这里就会起作用)
并且这里如果检查到线程节点状态为canclled,那么就直接从同步阻塞队列中移除该线程节点

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //这里也可以看出,当前节点的状态信息保存在上一个节点中  
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

上面已经大体介绍了AQS中枷加锁过程,下面我画了一个流程图,大致总结下:
这里假设线程1获取到了锁
AQS加锁过程

下面再画一下,AQS同步阻塞队列在加锁过程中的变化
(1) 初始时CLH为空

ps:java中的AQS就是使用的CLH锁修改的,关于CLH和AQS的简单比较可以在我的博客搜索CLH

初始时CLH为空

(2) 线程1获取到锁
线程1获取到锁

(3) 线程2来获取锁,但是发现已经被线程1持有了,所以线程2节点Node2入队,入队时和前面分析的一样,会设置一个空的Node作为head(首次入队)
enq()中的if执行完毕

enq()中的else执行完毕,Node2正式入队,并且可以看到,Node2的状态是在其前驱节点head中的

(4) 线程1释放锁了,unpark后面一个线程(后面会介绍),找到状态是非Cancelled的离head最近的线程节点进行唤醒,在这里Node2就获取到锁了 下面的图中就画出了阻塞队列中的节点获取到锁后AQS同步阻塞双向队列的变化.

ps:这一步是非常重要的,可以看出来,AQS中删除一个已经获取锁成功的节点就是直接将AQS的头结点指向后面的一个,并且让旧的head的指针和原来其中 封装的线程变为null即可(关于为什么要这样设计在我的其他文章会进行分析)
点击跳转查看(为什么AQS中Node节点的线程状态保存在上一个节点中?)

(5) 附加:同步阻塞队列中存在多个阻塞线程的情况

unlock()释放锁操作

    public void unlock() {
        sync.release(1);
    }
    public final boolean release(int arg) {
        //释放可重入的独占锁一次,如果锁完全释放了,即state为0,那么tryRelease(arg)返回true
        if (tryRelease(arg)) {
            //获取当前阻塞队列的头结点
            Node h = head;
            if (h != null && h.waitStatus != 0)
                //如果头结点不是空,并且头结点存在状态unpark头结点的等待的线程(保证队列有节点数据,需要唤醒),告诉他,你可以去抢锁了,我已经释放了
                unparkSuccessor(h);
            return true;
        }
        //说明可重入的独占锁还没有释放完
        return false;
    }

释放锁后设置当前的独占线程为null,返回当前锁是free状态(true),释放锁成功

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

重要的是unparkSuccessor()方法,该方法需要理解为,unpark唤醒后续阻塞线程节点(如果有)

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        //获取到head节点的状态
        int ws = node.waitStatus;
        //如果head节点的状态为负数,也就是线程有条件状态,那么就清除,将状态设置为0,至于为什么下面我会分析
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
         //获取head的下一个节点
        Node s = node.next;
        //如果下一个节点是空或者下一个节点的状态是取消状态,那么就说明当前的节点是无效节点,不需要被唤醒,继续寻找  
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

Q1: 为什么要先判断head的状态是不是负数然后再清除为0?

这个问题我是这样理解的,因为head节点在下一个节点入队时状态会被设置为-1,也就是需要park阻塞的状态,所以在节点进入的时候 默认是需要被park阻塞等待的,那么执行到unlock中的unparkSuccessor()也就唤醒后续节点中阻塞的线程,所以无论本次是否 找得到一个需要唤醒的线程节点,都先将head的状态清除并设置为0.因为如果找到了一个需要唤醒的节点,那么当前的head就会被替换, 如果没有找到,那么当前head的状态已经设置为0,下一次有节点元素再次释放锁,进入unlock方法,也会因为waitStatus=0,不需要进入 unparkSuccessor();

Q2: 为什么在unparkSuccessor中查找需要被唤醒的节点是从tail向head查找呢?

这个问题我觉得是AQS中考虑得非常周全的地方,前面我在解析lock的过程中分析了节点入队的操作,也就是addWaiter()

主要的入队源码如下:

    private Node addWaiter(Node mode) {
        //将当前线程封装为阻塞队列的Node节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        //获取尾节点
        Node pred = tail;
        //如果尾节点不为null,则说明当前的阻塞队列已经有阻塞节点了,那么直接放入尾节点之后
        if (pred != null) {
            //tail(旧尾) <-- currentNode
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                //tail(旧尾) --> currentNode  
                pred.next = node;
                return node;
            }
        }
        //如果pred是null,这说明当前阻塞队列可能是第一次初始化,阻塞队列是空的,当前节点作为头尾节点
        enq(node);
        return node;
    }
    private Node enq(final Node node) {
    //这里使用cas自旋操作,放在循环中,因为可能目前已经不满足之前的阻塞队列为空的条件了.
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                //执行到这里说明,当前同步阻塞双向队列是空的,进行初始化,并且可以看出,同步阻塞双向队列的头元素是没有保存线程
                //数据的,初始化的时候设置的是一个空Node对象,head只保存了下一个节点的指针
                if (compareAndSetHead(new Node()))
                    //这样,下面再次执行for循环,tail就不是空了
                    tail = head;
            } else {
                //先: tail(即将成为旧的尾节点的尾节点) <-- currentNode
                node.prev = t;
                //后: cas设置当前准备添加的同步阻塞双向队列的节点为tail
                //cas成功,则tail(旧尾节点了) --> currentNode(当前操作的节点,并且已经是新的tail)
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

可以看到,入队时,分为以下三步,整体不是原子性操作,在执行期间,其他线程可能会调用unlock中的获取后续需要唤醒的节点.
入队步骤

那么就会出现问题,比如在第一步执行完之后,在最后一步执行完之前就在遍历获取节点元素了,如果从头部开始往tail遍历,那么就会 出现无法访问到最后一个刚入队的元素,因为第三步还没有结束,没有next指针指向他,所以如果使用从尾部开始遍历,只要第二步的cas设置 tail结束了即可,不需要等到第三步结束. 所以这里的设计考虑真的是太太太严谨了

ps:以上仅为我个人在阅读源码时的解读思想,有不同意见的大佬请下方留言探讨哦


补充: 要搞懂AQS,需要深刻理解AQS中的下面两个关键状态变量:

  1. 同步器的锁状态
private volatile long state;
  1. 同步队列Node节点状态(表示继承者节点状态)
volatile int waitStatus;
  1. "比如aqs的子类ReentrantLock就实现了tryAcquire方法"这里我说的有问题,但意思大家明白即可,ReentrantLock是基于同步器实现的同步锁,所以AQS子类在ReentrantLock中作为内部类,起到同步器的作用,这一点Doug Lea大佬在AQS的官方注释中也有提到.