JUC线程池源码分析(二)

/ 默认分类 / 0 条评论 / 1147浏览

上一篇文章已经进行了简单的源码分析,下面我们来从几个问题来继续了解下线程池的原理.

  1. 既然我们说线程池可以避免线程的反复创建,那么线程池中的线程是如何保存重复利用的?

首先Thread线程start之后就是执行其中的runable执行逻辑(run方法),所以我们可以想象下,这个线程的重复利用就是一个不断的更新线程执行的逻辑,但是线程本身的执行 逻辑run方法中应该是一个自旋循环,不断地获取新的任务,不断的执行,如果没有就阻塞,所以前面我们看到存储执行任务的数据结构是阻塞队列,下面我们来通过源码看看是不是这么一回事.

从前面的源码分析可以看出,其实线程池中的线程是被封装为了Worker对象,Worker对象前面我们已经简单介绍了一些源码,下面我将完整的源码贴在出来了, Worker刚new出来一个线程池的时候,

//实现了AQS,所以worker自身就是一个独占锁,按照下面实现的tryAcquire和tryRelease可以看出,worker和ReenterLock是类似的.
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        //这就是Worker中封装的线程
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        //创建worker的时候就可以为该worker分配一个初始的任务,当然这个任务可以为空
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
         //这里可以很清楚的看出,new出来一个新的Worker时候,构造方法中就为Worker中封装的Thread线程对象赋值了
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //这一步非常重要,可以看到,这里传入到线程工厂中的参数是this,也就是当前创建的Worker本身,在前面我们介绍了线程创建工厂,所以这里传入了this,而
            //Worker也是实现了Runable接口,所以这里为当前Worker赋值而创建的线程的执行逻辑就是Worker自己,哈哈哈,现在直到为什么Worker要实现这个接口了吧好家伙
            //我自己封装我自己
            this.thread = getThreadFactory().newThread(this);
        }

        //那么我自己封装的线程执行逻辑最终是执行啥呢?就是下面这段代码,看到这里请直接跳出这个源码块,看下面的分析
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

上面说道了Worker自己就是线程池中封装的线程,也就是说可以任务Worker自己就是线程,并且Worker自己就包含了线程的执行逻辑,线程池中的线程的执行逻辑都是一样的,都是下面的方法

public void run() {
    runWorker(this);
}

也就是说,线程池中的线程被安排要执行的任务是被封装在了Worker中的firstTask这个Runable执行逻辑属性中的,或者是直接获取存储任务的阻塞队列中.后面会详细介绍. 所以,线程池中的线程其实就是被封装为了Worker,所以下面我说Worker其实就是说的线程池中的线程.
在之前介绍addWorker()的方法的时候,可以看到,new出来一个worker,然后进过一些必要的判断之后,先获取了Worker中封装的线程,然后直接start了这个线程

t.start();

所以也就是说,线程池中的加了一个线程了并且立刻进入了runable状态,当然如果它立刻获得了cpu时间片就会是runing状态了,那么也就是执行这个Worker中封装的上面run方法
下面就是runworker方法的源码

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //首先获取当前worker的firstTask
        Runnable task = w.firstTask;
        //也就是说,无论有没有设置firstTask,第一次启动执行worker之后,firstTask都是null了
        w.firstTask = null;
        w.unlock(); // allow interrupts
        //completedAbruptly表示线程是否出现异常退出
        boolean completedAbruptly = true;
        try {
        //这里就是重点重点,可以看到,线程启动后就是一个循环,获取firstTask或从任务阻塞队列中获取任务来不断执行
            while (task != null || (task = getTask()) != null) {
            //执行任务前需要先获取锁,所以线程池中对单个线程的操作是线程安全的
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //执行任务前,这个方法在ThreadPollExecutor没有实现,protect修饰,可以自行拓展
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                    //这里就是真正的执行获取到的任务  
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                    //执行任务后,这个方法在ThreadPollExecutor没有实现,protect修饰,可以自行拓展
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    //这是为了统计线程池中执行的所有任务数,只要把每个Worker的加在一起就行了
                    w.completedTasks++;
                    //当前worker执行完逻辑后释放所
                    w.unlock();
                }
            } //while循环到这里终止  
            completedAbruptly = false;  //如果线程正常执行任务结束,那么就将异常退出设置为false
        } finally {
            //线程退出while循环后最终都会执行下面的线程回收
            processWorkerExit(w, completedAbruptly);
        }
    }

ps:上面的while (task != null || (task = getTask()) != null) {,这段就是主要的线程重复利用的原理,getTask()方法就是从阻塞队列中获取执行任务,下面重点分析下这个方法

所以,在正常情况下,while循环是否会退出,取决于getTask()方法是否会返回null,也就是任务队列是否还有任务等待执行,但是这里任务队列是阻塞队列,所以按道理while正常情况下 会一直执行不会退出,因为即使任务队列为空,那么出队列的操作也是会发生阻塞,直到有数据获取到. 但是这只是一种情况,真实的还有一种情况就是获取超时操作,也就是我们初始化线程池时候 设置的是否允许核心线程超时,下面分析下getTask的源码

    private Runnable getTask() {
    
        boolean timedOut = false; // Did the last poll() time out?
        //自旋
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            //如果线程池不是运行状态,且任务队列为空,就减少一个工作者线程数量,并返回null
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            //获取是否允许线程空闲超时,要不是设置了允许核心线程超时,要不是当前线程数多于核心线程数 
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
            //第一次timedOut = false,只可能是wc > maximumPoolSize,那么发现任务队列是空的就减少工作线程数,然后直接返回null
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            //如果线程池一切正常,就会执行下面的,从任务队列获取任务,这里很重要,可以看到如果设置了允许核心线程超时或者当前线程数多于核心线程数,那么
            //从阻塞任务队列中获取任务时就允许获取超时,超时时间就是keepAliveTime(线程空闲时间),否则执行阻塞获取take,这些juc中的数据结构我之前都有
            //分析源码,需要的小伙伴可以搜索我之前的博客  
            try {
            //所以这一步可能会阻塞
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                    //如果获取到了任务就直接返回任务
                if (r != null)
                    return r;
                //如果执行到这里了说明获取超时了,进入下一次自旋获取
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

ps:综上可以看出,getTask获取任务,排除线程池参数异常或状态异常的情况会直接返回null,其他情况下getTask本身会自旋获取任务,所以即使是阻塞超时获取也会不断自旋, 如果是阻塞获取,那么就会阻塞在task()上,所以getTask方法可以认为是不获取到任务不罢休,除非出现线程池本身问题.

下面看下,线程回收的执行逻辑,要不是线程执行时发生中断,退出while循环,这种等于异常退出,completedAbruptly为true,否则,getTask方法返回null,正常空闲退出,执行下面的 回收逻辑,因为空闲所以需要回收,这里空闲回收肯定是满足了getTask为null(所以才会出现退出while循环),并且getTask返回是null,那么就已经在getTask中按照线程池设置的进行了判断 (allowCoreThreadTimeOut || wc > corePoolSize)

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        //如果是异常退出,减少一个工作线程数量  
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            //移除当前的工作线程,数量在前面的getTask中已经减过了,这里直接移出线程集合  
            //这说明进入processWorkerExit方法之后,当前Worker线程肯定会被回收删除  
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
        //如果不是异常退出,并且现在的线程池中的线程数量需要增加,那么就addWorker
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }  

有了上面源码的分析,我们就可以来总结下面的几个问题:

  1. 线程池中的线程什么时候会被回收?是怎样被回收的?

线程池中的线程会执行RunWorker方法,然后在一个循环里面不断获取任务队列中的runable任务,这是一个阻塞队列,可以根据初始化时设置的是否允许核心线程超时来决定是否需要 阻塞获取超时,如果阻塞获取超时也是会继续执行前面的for循环,之后会减小worker的数量并返回null,所以退出while循环,开始回收空闲的工作线程