ConcurrentLinkedQueue与LinkedBlockingQueue

/ 默认分类 / 0 条评论 / 954浏览

ConcurrentLinkedQueue与LinkedBlockingQueue

一.ConcurrentLinkedQueue

ConcurrentLinkedQueue内部存储结构是一个volatile修饰的无限队列,保证线程内存可见性
ConcurrentLinkedQueue使用cas保证操作原子性,并发环境下,不会发生线程等待阻塞。

private transient volatile Node<E> head;

1.offer入队操作

插入的数据不允许为null,否则会抛异常

    public boolean offer(E e) {
        checkNotNull(e);
    private static void checkNotNull(Object v) {
        if (v == null)
            throw new NullPointerException();
    }

插入数据的时候使用的是循环尝试CAS保证插入的原子性

for (Node<E> t = tail, p = t;;) {  //不断循环尝试,直到插入成功
                    Node<E> q = p.next;//获取队列尾元素的next元素(待插入肯定是null)  
                    if (q == null) { 
                        // p is last node
                        if (p.casNext(null, newNode)) {//如果插入期间尾节点的next还是null,那么就cas成功(return true,循环结束),否则失败继续循环
                            // Successful CAS is the linearization point
                            // for e to become an element of this queue,
                            // and for newNode to become "live".
                            if (p != t) // hop two nodes at a time
                                casTail(t, newNode);  // Failure is OK.  
                            return true;
                        }
                        // Lost CAS race to another thread; re-read next
                    }
                    else if (p == q)
                        // We have fallen off list.  If tail is unchanged, it
                        // will also be off-list, in which case we need to
                        // jump to head, from which all live nodes are always
                        // reachable.  Else the new tail is a better bet.
                        p = (t != (t = tail)) ? t : head;
                    else
                        // Check for tail updates after two hops.
                        p = (p != t && t != (t = tail)) ? t : q;
                }

2.poll出队操作

出队操作也是使用cas保证操作原子性

 public E poll() {
        restartFromHead:
        for (;;) { 
            for (Node<E> h = head, p = h, q;;) {//cas循环尝试
                E item = p.item;  //这里先获取头元素

                if (item != null && p.casItem(item, null)) { //出队等于将头元素变为空,然后返回之前获取的头元素,这里cas比较还是否为之前获取的头元素  
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item; //操作成功  ,循环终止  
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

size()获取元素数量

    public int size() {
        int count = 0;
        for (Node<E> p = first(); p != null; p = succ(p))
            if (p.item != null)
                // Collection.size() spec says to max out
                if (++count == Integer.MAX_VALUE)
                    break;
        return count;
    }

可以看出来,因为cas操作代码层没有加锁,所以再并发环境下,size方法返回的数据可能再遍历期间数据会增加或减少导致结构不准确;

ps:这里也是和其他集合的一个不同点,其他集合比如list类的会在操作中维护size,需要时直接return即可

二.LinkedBlockingQueue

LinkedBlockingQueue相比前面的ConcurrentLinkedQueue区别就在于,其使用了独占锁保证操作原子性,所以会发生线程阻塞等待.
并且LinkedBlockingQueue队列支持阻塞队列的操作,出队列操作时,如果队列为空,则会发生阻塞,直到有数据才会返回;同样的插入数据也是,当插入 数据时当前队列长度已经达到最大则阻塞,直到有空间,下面我从源码层面分析总结下:

    //出队列的独占锁
    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    //出队列操作的阻塞线程的条件队列
    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    //入队列的独占锁  
    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    //入队列操作的阻塞线程的条件队列
    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

1. offer入队操作

//timeout是线程阻塞超时时间,之前在介绍juc的时候有说明过,其实就是和并发操作中的wait(timeout)是一样的意义     
public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        //计算timeout超时时间  
        long nanos = unit.toNanos(timeout);
        int c = -1;
        //入队操作的独占锁  
        final ReentrantLock putLock = this.putLock;
        //也是cas保证当前元素数量的原子性  
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            //如果当前队列已经满了,这里放在while循环中,保证 如果唤醒后还是满的那么就还是会阻塞  
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                    //如果当前队列已经满了,那么当前线程进入入队的阻塞条件队列
                nanos = notFull.awaitNanos(nanos);
            }
            //如果队列有空间,直接入队  
            enqueue(new Node<E>(e));
            //cas增加count+1,返回的数据c是cas修改前的数据
            c = count.getAndIncrement();
            if (c + 1 < capacity)
            //如果现在队列还有多于空间,唤醒之前其他入队时阻塞的线程,告诉他们现在可以来插入数据了,这里是一个默认的非公平独占锁    
                notFull.signal();//(1)
        } finally {
            putLock.unlock();
        }
        //这里告诉其他获取时阻塞的线程,现在不是空了,我加了数据了,快去抢!
        if (c == 0)
            signalNotEmpty();
        return true;
    }

2.take()出队操作

出队操作其实就是一样的道理

public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                //如果出队时当前队列没有数据,那么当前线程进入出队线程阻塞条件队列  
                notEmpty.await();
            }
            //如果有数据可以出,那么直接出数据
            x = dequeue();
            //cas原子操作保证count-1
            c = count.getAndDecrement();
            //出完数据之后,如果当前还有数据,就唤醒之前因为出数据没有数据导致阻塞的线程,告诉他们现在可以来出数据了  
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        //
        if (c == capacity)
            signalNotFull();
        return x;
    }
    
    同样的,出队也可以设置阻塞时间,源码的含义基本一样,不赘述    
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

ps: 真的,我的博客中不仅一次感叹这些框架,sdk的开发者真的考虑问题非常全面仔细,这里说一下,入队操作和出队操作最后的if中的唤醒操作(建议你可以先自己看下jdk源码,不要看我的注释):

在offer入队操作中,c == 0 表示在本次添加元素前,队列是空的(这是Unsafe类中实现的,我之前的博客中有介绍),那么就唤醒当前阻塞的获取数据的线程 我第一次看到的时候,认为,唤醒当前阻塞的获取数据的线程,那是不是应该c != 0作为条件,因为这表示当前插入元素之前,队列中有至少一个数据,这样唤醒那些阻塞的线程告诉他们现在有数据了 但其实应该时c==0,因为这表示在我本次插入元素之前,队列中元素个数为0,那么这就说明可能会有线程阻塞了,因为如果c!=0,那么肯定没有阻塞了,因为在我插入数据之前队列就有数据,那么取数据 时候怎么会阻塞呢?所以这里就是c==0,我插入前是空的才会发生阻塞,那么就唤醒那些获取数据阻塞的线程,告诉他们我插入数据了,现在不是空了,你们快去抢; 当然你可能会说,可以等下一个线程 操作获取数据时,它会操作后唤醒其他线程,是的,是会唤醒,但是条件是它获取后至少还有数据,另外它啥时候才会来获取这是未知的,如果不来,那难道本次插入的元素就要一直不被取走,即使已经 有很多想取走但是之前阻塞了的在那里了?所以这里的设计是非常周全合理的。

同理,在出队操作中,如果c == capacity,也就是说在本次出队操作之前,队列时满的,那么就说明之前可能有线程想插入数据但是因为队列是满的阻塞了,所以这里直接唤醒,告诉他们,我刚刚移出去了一个 数据,你们这些想往里面插数据的赶紧去抢这个坑吧!

3. 出队和入队不阻塞的操作

LinkedBlockingQueue中也支持不阻塞的调用,比如:

入队:

 public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

出队:

public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }