JUC线程池源码分析(一)

/ 默认分类 / 0 条评论 / 1145浏览

JUC线程池源码分析

一.执行器接口Executor

执行器接口,其中只有一个执行方法execute(),参数为Runable线程执行逻辑

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

二. 执行器服务接口ExecutorService

执行器服务接口,其中包含了很多种执行操作

public interface ExecutorService extends Executor {
    //关闭执行器,已经提交到执行器的任务会继续被执行,但是改方法不会等待他们执行完毕,并且执行器不会再接收其他的任务了
    void shutdown();
    
    //阻塞关闭执行器,会等待现有的任务执行完毕,并返回成功或失败,并且可以设置阻塞超时时间(和上面不同的一点就是会阻塞等待当前现有的任务执行完毕)
    boolean awaitTermination(long timeout,
                             TimeUnit unit)
                      throws InterruptedException;
    
    //停止当前执行器中的所有任务,包括正在执行的任务和等待执行的任务,并返回等待的任务列表  
    List<Runnable> shutdownNow();
    
    //submit系列方法和invoke系列方法可以提交新任务到执行器中执行  
    ......
}

三. 抽象执行器服务类AbstractExecutorService

执行器服务接口实现类-抽象执行器服务类

public abstract class AbstractExecutorService implements ExecutorService {
        //先来看下这个方法,AbstractExecutorService实现了submit方法,里面调用了execute方法,但是没有实现,所以
        //这是一个抽象类,需要子类来实现execute
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
}

四. ThreadPoolExecutor线程池执行器

线程池执行器,我们且就叫他线程池,线程池集成了抽线线程执行器服务类

public class ThreadPoolExecutor extends AbstractExecutorService {

java中线程池主要就是该类,下面我们源码的角度分析下该类

    //ctl是原子操作数据类型,其高三位表示线程池状态,剩余的低位表示线程个数  
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

这里来简单分析下为什么是这样表示的高低位数据,Integer.SIZE表示获取当前操作系统上的int类型数据的位数,一般位32,所以 COUNT_BITS的值就是29,所以RUNNING值就是-1<<29也就是11111111 11111111 11111111 11111111 << 29也就是 11100000 00000000 00000000 00000000,同样的道理,SHUTDOWN就是00000000 00000000 00000000 00000000,STOP就是 00100000 00000000 00000000 00000000其他的状态都是一样的左移操作,所以可以用高三位表示状态。因为这些数后29位都是0,所以可以和表示线程数量的参数进行或操作,这样得到的结构就是使用一个数就能同时保存当前的线程池状态和线程池中的线程数。

就像下面这样:

00000000   00000000   00000000   00000001
00100000   00000000   00000000   00000000    2^29
00011111   11111111   11111111   11111111    2^29-1  (最大线程数)
所以比如这样计算:ctlOf(stop,2)
00100000   00000000   00000000   00000000  
或
00000000   00000000   00000000   00000010
结果
00100000   00000000   00000000   00000010

另外上面的ctl的初始值可以看出,线程池默认的初始化值表示,线程池状态初始值为运行中RUNNING状态,线程池中线程个数初始化值为0

//线程池拒绝策略,当线程池中的线程处于饱和或者线程池处于关闭状态时,再提交任务时的拒绝策略
private volatile RejectedExecutionHandler handler;

//该参数表示线程池中应该存在的最大线程数,但是实际上最大线程数是2^29-1个,即CAPACITY参数,这里的maximumPoolSize是在构造方法创建线程池的过程中指定的,当线程池中存在的线程
//个数超过了maximumPoolSize就会触发interruptIdleWorkers终止多于的空闲线程
private volatile int maximumPoolSize;  

//线程池中的核心线程数, 保证这么多线程处于活跃状态不会空闲超时过期,但是如果设置了allowCoreThreadTimeOut,空闲超时的线程个数就不受该参数的限制了
private volatile int corePoolSize;

//是否允许核心线程空闲超时  
private volatile boolean allowCoreThreadTimeOut;  

//线程空闲的超时时间
private volatile long keepAliveTime;


五.重点-线程池原理分析

使用线程池可以避免线程的反复创建销毁,减小开销提高性能,线程池初始时有初始数量的线程数,并且会保持一定数量的核心线程数量,多余的线程在超出最大空闲时间会 被自动销毁,回收利用空间,线程池也具有很多拒绝策略,在线程池中的任务满了的时候,会按照设定的不同的拒绝策略处理当前提交到线程池中执行的任务.

上面这段话相信很多小伙伴都有看过类似的,很多文章,书籍就是这样介绍线程池的,有的文章说深入分析java线程池原理,源码分析啥的,都是像上面一样解释下一些参数的含义 然后说明下线程池的作用啥的,但是,我们是否想过,所谓线程池,提前初始化时创建好线程存放在哪里?为什么可以存放线程?为什么提交的任务可以在线程池现有的线程中运行,为什么 线程池中的线程可以超时销毁,不够核心线程数的时候还可以自动添加?这些和我们之前创建线程,然后执行start()来执行线程逻辑完全不一样,下面我就来详细和大家分析下 java线程池的原理.

1.线程池中的成员变量说明

private final BlockingQueue<Runnable> workQueue;  
private final HashSet<Worker> workers = new HashSet<Worker>();
private final ReentrantLock mainLock = new ReentrantLock();
private volatile RejectedExecutionHandler handler; 

private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();

下面还要来介绍下工作线程Worker,这是线程池自己内部实现的一个数据结构,它集成了AQS(AQS之前我们在juc中详细分析了,可以参考我之前的文章),所以Worker 其实也是一个独占锁,并且worker还实现了Runable接口,所以其本身就是一个线程执行逻辑 .

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        //Worker是一个工作线程,需要执行提交到线程池中的任务逻辑,所以其肯定代表一个线程的功能
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        //该worker需要执行的任务逻辑
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //构造方法中使用线程池中的线程工厂创建一个新的线程,并且这里看到传入的参数是this,也就是说当前worker中的线程
            //的执行逻辑是当前worker自己
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        //worker的执行逻辑很简单,就下面一段代码,精髓都在runWorker上,这里传入了this也就是worker自己,其实就足够了,因为执行任务就需要两个,线程和执行逻辑,worker全有了
        public void run() {
            runWorker(this);
        }

先介绍这么多,我们来写一个小例子

public class Test11 {
    public static void main(String[] args) {
        int corePoolSize = 2;
        int maxPoolSize = 4;
        long keepAliveTime = 3;
        TimeUnit unit = TimeUnit.SECONDS;
        LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);
        RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize,maxPoolSize, keepAliveTime,unit,workQueue,rejectedExecutionHandler);
        threadPool.setThreadFactory(new UserThreadFactory("54zh.cn"));
//        threadPool.allowCoreThreadTimeOut(true);
        threadPool.execute(() -> {
            System.out.println(Thread.currentThread().getName()+" 执行了第一件事");
        });
    }
}

上面这个程序运行后,会打印From UserThreadFactory's 54zh.cn-Worker-1 执行了第一件事,之后jvm不会退出,会一直运行状态,main进程也不是守护进程,但是执行到最后一个execute方法之后 main线程等于结束了,那么如果main线程是最后一个非守护线程,那么jvm应该会退出才对,所以这里打印之后还有其他的非守护线程是运行着的。但是如果我们将上面注释部分放开,那么打印之后,三秒后 jvm就会退出了,也就是说3s后jvm中已经没有非守护线程了。

ps:上面程序中我为线程池设置了线程创建工厂,这是阿里java开发手册中的建议,这样线程就可以有自己的名字,可以和业务关联,在后期的问题定位可以发挥作用。

上面这个例子很明显就是说明了之前我们介绍的参数的作用,keepAliveTime是3s,所以当执行完我们提交的任务后,线程池中的线程就是处于空闲状态,并且我们又设置了核心线程也会发生空闲超时销毁。

下面我们来分析下线程池的源码:

线程池的几个构造方法的重载最终调用的都是下面的方法

很明显构造方法中就是按照程序员设定的参数为当前new出来的线程池的一些属性变量赋值。这里我们看到,没有任何和线程相关的代码,都只是一些简单的参数的赋值而已。所以当你new一个线程池 出来后,线程池中其实只是多了一些初始参数值和线程创建工厂,拒绝策略等;

这里我贴一下自定义的线程工厂的代码(参考阿里巴巴java开发手册),很简单就是实现了线程工厂接口中的创建线程的方法,并且统一在创建线程的时候指定了名称

ps:所以这里其实也就是说明,我们创建线程池的时候最好和业务相关联。

public class UserThreadFactory implements ThreadFactory {
    private final String namePrefix;
    private final AtomicInteger nextId = new AtomicInteger(1);

    // 定义线程组名称,在 jstack 问题排查时,非常有帮助
    UserThreadFactory(String whatFeaturOfGroup) {
        namePrefix = "From UserThreadFactory's " + whatFeaturOfGroup + "-Worker-";
    }

    @Override
    public Thread newThread(Runnable task) {
        String name = namePrefix + nextId.getAndIncrement();
        return new Thread(task, name);
    }
}

execute方法就是来自上面第一个接口中方法的实现,向线程池中提交了一个执行逻辑

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //获取线程池的状态和容量ctl,ctl前面已经介绍这里不做赘述,假装你已经知道
        int c = ctl.get();
        //如果当前线程池的线程数小于核心线程数,前面我们介绍了线程数初始值为0,所以这一次提交任务肯定这个判断是true
        if (workerCountOf(c) < corePoolSize) {
            //所以第一次提交任务到线程池都会执行下面的方法addWorker(command, true),这表示
            // 通过向线程池添加工作线程来执行本次提交的任务(这里的true表示不允许线程池中的工作线程数超过maximumPoolSize,如果是false则表示不允许超过最大容量即可),下面我有详细分析  
            if (addWorker(command, true))
                return;
            //如果if为false,那么就重新获取下最新的当前容器的ctl,因为容器状态不是running上面addWorker也会返回false
            c = ctl.get();
        }
        //如果,当前线程池中的线程数大于等于核心线程数,并且还是运行状态,就提交任务到阻塞队列,并且当前提交的任务逻辑入队成功
        if (isRunning(c) && workQueue.offer(command)) {
            //再次获取ctl,因为入队这一会,可能当前突然就不是运行状态了,这里非常严谨
            int recheck = ctl.get();
            //如果不时运行状态,就移除前面添加进去的任务,执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //否则如果线程池中没有工作线程就添加一个空闲工作线程(执行逻辑是null,所以线程启动,run方法也没有任何执行步骤),这里是为了防止万一,虽然上面的if判断了但是,可能这期间刚好核心线程也都超时过期了,这里也能看出非常严谨
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果上面都失败了,那么就直接添加新工作线程来执行此次任务并且不受maximumPoolSize线程最大个数的限制,只要不超过最大容量就好了,因为最大容量是2^29-1,所以可以任务没有限制
        else if (!addWorker(command, false))
            //如果还是失败了就直接执行拒绝策略(可能是线程池状态问题,下面有详细分析)
            reject(command);
    }

private boolean addWorker(Runnable firstTask, boolean core) {
        //下面的两个嵌套的自旋是为了修改线程池中的线程个数,因为需要将线程个数加1
        retry:
        for (;;) {
            //获取到线程池当前容量和状态
            int c = ctl.get();
            int rs = runStateOf(c);
            //如果线程池不是运行状态,且满足至少下面三个的其中一个: 
            //1.线程池不是SHUTDOWN状态 2.初始任务不是null 3.线程池任务队列是空的
            //那么返回false,即表示通过向线程池添加工作线程来执行本次提交的任务失败
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                //获取当前的线程个数
                int wc = workerCountOf(c);
                //如果线程个数已经超过了限制,也是返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //cas自增线程池中的线程数
                if (compareAndIncrementWorkerCount(c))
                    //如果cas更改成功,就直接退出第一个大的自旋,整个for循环退出
                    break retry;
                //如果cas失败,那么获取当前最新的ctl,因为cas失败说明期间有并发修改,所以需要获取下最新的数据,如果下面的if没有执行,那么就继续本次内自旋再次cas修改
                c = ctl.get();  // Re-read ctl
                //如果线程池状态变了,那么就直接回到第一个自旋,因为线程池状态可能已经不是运行状态了,判断线程状态的在第一个自旋中
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
                
            }
        }
//也就是说,上面第一个外自旋是先判断线程池的状态,然后内自旋是cas自旋,为了修改线程池中的线程个数
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //创建一个工作线程,这里赋值了任务逻辑
            w = new Worker(firstTask);
            //获取worker中的线程
            final Thread t = w.thread;
            if (t != null) {
            //首先获取独占锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    //获取线程池状态
                    int rs = runStateOf(ctl.get());
                    //如果是运行状态或者关闭状态但执行任务为空
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        //如果线程已经启动了,那么说明当前已经在运行过其他任务逻辑,并不是第一次使用这个Worker,那么就抛出异常(因为我们现在讨论的都是新建一个Worker,所以一定
                        // 是第一次使用这个Worker的线程)  
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //将当前新建的Worker添加到workers集合中
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        //为了执提交的新执行逻辑添加新的worker成功, 设置标志
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //如果添加了这个新的worker成功了就立刻让他开始工作
                    t.start();
                    //设置开始工作的标志为true
                    workerStarted = true;
                }
            }
        } finally {
                //如果worker启动失败了需要从worker集合中删除,addWorkerFailed就是做这个事的
            if (! workerStarted)
                addWorkerFailed(w);
        }
        //返回是否添加并启动worker成功
        return workerStarted;
    }