Java并发编程(三)JUC锁基础框架抽象同步队列的学习总结

/ 默认分类 / 0 条评论 / 1030浏览

java并发包中的锁机制(抽象同步队列)

前面介绍了synchronized锁机制,就是使用进出监视器实现排斥作用,并且之后进行了锁优化,提升了synchronized的性能。 synchronized就是通过设置对象头的markword,适应不同的锁竞争情况,使用不同的锁状态,只有最终重量级锁才会调用操作系统原子指令 mutex(进出监视器指令,阻塞挂起等待的线程)。

java中还提供了可以自己实现并发线程调度的锁机制,之前我有介绍LockSupport,这个类基于Unsafe实现的,提供了线程直接调度的方法,包括挂起和唤醒等操作, java并发包中实现的锁机制都是基于AQS(AbstractQueuedSynchronizer 抽象同步队列)实现的。

AQS是一个FIFO的双向队列,其内部通过节点head和tail记录队首和队尾元素,队列元素的类型为Node,其中Node中的thread变量用来存放进入AQS队列里面的线程; Node节点内部的SHARED用来 标记该线程是获取共享资源时被阻塞挂起后放入AQS队列的, EXCLUSIVE用来标记线程是获取独占资源时被挂起后放入. AQS队列的; waitStatus记录当前线程等待状态,可 以为CANCELLED (线程被取消了)、SIGNAL (线程需要被唤醒)、CONDITION (线程在条件队列里面等待)、PROPAGATE (释放共享资源时需要通知其他节点) ; prev记录当前节点的前驱节点, next记录当前节点的后继节点。

AQS(AbstractQueuedSynchronizer)中有下面三个成员变量:

    private transient volatile Node head;

    private transient volatile Node tail;

    private volatile int state;

封装的竞争锁的线程的Node对象:

static final class Node {
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         */
        volatile int waitStatus;
        volatile Thread thread;

前面两个变量就是存放当前争夺该锁的线程数据的双向队列,后面的state变量就是表示当前锁状态的重要值,线程操作修改state来表示当前 锁和线程的关系。

下面介绍AQS实现独占锁的方式,主要依赖AQS中下面几个方法:

当一个线程调用独占锁的acquire方法,则表示该线程想获取独占锁,方法里面会调用tryAcquire方法,这个方法在aqs中没有实现,需要aqs的子类实现,主要就是 通过设置state值来表示当前锁的状态,比如aqs的子类ReentrantLock就实现了tryAcquire方法,该方法中设置state的值,如果设置成功就返回,失败就将当前队列 封装未Node节点放入该锁的同步队列中,并调用LockSupport的park将自己挂起。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
   

当线程调用release方法后,也是修改state值,可以看到这里调用了tryRelease方法,这个方法同样的也是需要AQS的子类实现,比如ReentrantLock中就是将state状态减1,如果 state为0了就表示当前线程释放了该锁,那么就会调用LockSupport的unpark方法唤醒该锁上阻塞的一个线程,告诉它可以去拿锁了,我用完了。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

上面可以看出,AQS是独占锁的基础同步框架,其中的主要逻辑需要子类自己实现,即子类需要定义state不同值代表的状态,然后子类中还需要使用CAS来更新state的值,通过减小或增大state的值 表示锁的操作,比如ReentrantLock这个aqs的子类中,定义state=0表示当前没有线程持有锁,所以当该锁第一次被线程1获取了,线程1cas(0,1)成功修改state状态为1,那么当前锁表示被线程1 获取,其他线程如果cas(0,1)就会失败,并进入阻塞挂起队列中,如果线程1再次获取锁,那么就使state值加1,所以线程1要释放锁需要一层层释放,直到state为0则表示线程1释放了锁,并且唤醒1个 阻塞的等待线程。

前面分析synchronized的时候,介绍了wait(),notify(),notifyAll()这些方法,其实这些就是为了配合synchronized实现线程间同步操作的,可以主动实现线程阻塞挂起和唤醒,他们的操作其实 也就是借助sychronized关联的监视器对象。其实现在介绍的java并发包中的AQS,其实也支持这些操作,在AQS中叫做条件变量,可以创建一个当前锁的条件变量,条件变量的await()和signal()和signalAll()方法 就可以实现和wait(),notify(),notifyAll()一样的效果. 下面就是一个简单的例子。

        ReentrantLock lock = new ReentrantLock();
        condition condition = lock.newCondition();
        lock.lock();
        try {
            System.out.println("begin wait");
            condition.await();
            System.out.println("end wait");
        } catch (
                Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        lock.lock();
        try {
            System.out.println("begin signal");
            condition.signal();
            System.out.println("end signal");
        } catch (
                Exception e) {
            e.printstackTrace();
        } finally {
            lock.unlock();
        }  

ReentrantLock就是利用了AQS抽象同步队列框架实现的一个独占锁,声明锁的时候可以指定为公平锁和非公平锁,所谓公平和非公平其实就是前面说的释放锁后唤醒其他阻塞线程的顺序,如果按照 先阻塞之后就先唤醒那么就是公平锁(先来后到),否则就是非公平。

ps:关于java并发包中AQS的详细实现原理还有很多需要学习的,这里我先总结到这,后面有时间,再详细总结下AQS的原理。