线程池实际运用中的问题记录-监控数据异常

/ 默认分类 / 0 条评论 / 1226浏览

ps: 关于Java中的线程池的一些基础知识和源码分析已经在之前的文章中分析了,下面是记录一个工作中遇到的一个关于线程池使用的问题

一.场景介绍

我们目前做的项目类似与国内的支付宝,当然总体业务没有支付宝那样复杂,因为这个项目才上线三年左右,在这个产品中有一个子产品,可以看作是国内支付宝的花呗,这里我们会有一个定时任务每天凌晨批量处理计算用户的利罚息,我们使用的是线程池异步执行.下面是我写的一个简单的大致模拟结构:

/**
 * @author hi@54zh.cn 程序员大晖
 * @date 2022/4/5 - 20:44
 */
public class BatchTask implements Runnable {


    @Override
    public void run() {
        try {
            TimeUnit.MILLISECONDS.sleep(timeSleep);
            System.out.print(Thread.currentThread().getName()+"_"+batchNo+"_send"+"任务开始执行 ==> ");
            String countryCode = CountryCodeHolder.getCountryCode();
            sendFailCount.addAndGet(1);
            System.out.print(Thread.currentThread().getName()+"_"+batchNo+"_执行完毕"+countryCode);
        } catch (Exception e) {
            System.out.print(Thread.currentThread().getName()+"_"+batchNo+"_send任务开始执行异常"+e.getMessage());
        }
    }


    long timeSleep = 100;

    AtomicInteger sendFailCount;

    int batchNo;

    public BatchTask(long timeSleep) {
        this.timeSleep = timeSleep;
    }

    public BatchTask(long timeSleep, AtomicInteger sendFailCount) {
        this.timeSleep = timeSleep;
        this.sendFailCount = sendFailCount;
    }

    public BatchTask(long timeSleep, AtomicInteger sendFailCount, int batchNo) {
        this.timeSleep = timeSleep;
        this.sendFailCount = sendFailCount;
        this.batchNo = batchNo;
    }

}
public class CountryCodeHolder {
	

	private static final ThreadLocal<String> contextHolder = new InheritableThreadLocal<>();

	/**
	 * 赋值
	 * @param countryCode
	 */
	public static void setCountryCode(String countryCode) {
		if (null == countryCode){
			throw new NullPointerException("国家码参数不正确");
		}

		contextHolder.set(countryCode.toUpperCase());
	}

	/**
	 * 获取
	 * @return
	 */
	public static String getCountryCode() {
		if(null == contextHolder.get()) {
			throw new IllegalArgumentException("国家码未赋值");
		}
		return contextHolder.get();
	}

	/**
	 * 清除
	 */
	public static void clearCountryCode() {
		contextHolder.remove();
	}
	
}
/**
 * @author hi@54zh.cn 程序员大晖
 * @date 2022/4/5 - 23:21
 */
public class Listener {

    public int listen(){
        System.out.println("pThread"+Thread.currentThread().getName());
        AtomicInteger sendFailCount = new AtomicInteger(0);
        ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 15, 2l, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("batch_"+ ThreadLocalRandom.current().nextInt(0,20000));
                return thread;
            }
        });
        pool.allowCoreThreadTimeOut(true);
        for (int i = 0; i < 1; i++) {
            BatchTask batchTask = new BatchTask(5000,sendFailCount,0);
            pool.submit(batchTask);
        }
        int u = pool.getActiveCount();


        //等待完成
        System.out.println("wait");
        while (pool.getActiveCount() != 0){
            System.out.print(".");
            System.out.print(".");
            System.out.print(".");
            System.out.print(".");
        }
        System.out.println("wait-end");
        System.out.println(Thread.currentThread().getName()+"【over sendFailCount】"+sendFailCount.intValue());
        System.out.println("listener main over   "+u + "v"+pool.getActiveCount());
        return 10;
    }

}

上面的代码,不难看出,最后面的打印,是为了在所有线程池中的任务执行完毕后,输出线程池本次执行的数据的情况,但是实际中,上面的代码执行很多次后,出现下面两种情况:

                while中   while后   任务结束
0   ->   1   ->   0   ->   1   ->   0
初始 while前(和中的一小会)

下面是具体的打印结果:

listener-parentstart listener
pThreadlistener-parent
wait
............wait-end
listener-parent【over sendFailCount】0
listener main over   1v1
oooooover10
batch_7412_0_send任务开始执行 ==> batch_7412_0_执行完毕CN
listener-parentstart listener
pThreadlistener-parent
wait
省略很多.............................................................................................................................batch_5724_0_send任务开始执行 ==> ......................batch_5724_0_执行完毕CN....................................wait-end
listener-parent【over sendFailCount】1
listener main over   1v0
oooooover10

二.分析

首先,对于上面的代码,我的目的就是希望可以在本次线程池执行任务结束后输出一些监控数据,这里我模拟了一些执行失败的计数,但是第一种情况下,我们可以看到,getActiveCount并没有达到预期的效果

这里我们先来看下getActiveCount的源码

    /**
     * Returns the approximate number of threads that are actively
     * executing tasks.
     *
     * @return the number of threads
     */
    public int getActiveCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            int n = 0;
            for (Worker w : workers)
                if (w.isLocked())
                    ++n;
            return n;
        } finally {
            mainLock.unlock();
        }
    }
    
    
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
     
    public boolean isLocked() { return isHeldExclusively(); }
        

可以看到,就是对所有的workers进行一个计数。
那么很容易联想到,worker是怎样放进去的,什么时候什么情况下会被放进去,当然,关于线程池的这些原理问题,我早在另一篇文章中分析过了,感兴趣的小伙伴可以去看下(搜索线程池),

private boolean addWorker(Runnable firstTask, boolean core) {

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Worker类是这样的(这里只抽出了关键部分,具体见我的另一篇专门探讨线程池原理的博客):

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        Worker(Runnable firstTask) {
           -->(1) setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        -->(2)   w.unlock(); // allow interrupts //-->(2) 
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                -->(3)   w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    -->(4)   w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

重点的部分我使用-->进行了标注,具体的lock和unlock代码是Worker实现的,lock就是设置state为1,unlock 就是设置state为0,所以整个submit到执行完毕的过程state是这样变化的:

event:submit后new Worker -->  执行任务前unlock -->  获取task的while中lock --> 执行任务 --> 执行完毕unlock
state:     -1                      0                        1               保持不变还是1       0

三.异常原因解释

所以这就可以解释,为什么我们使用getActiveAcount会得到不同的运行结果,也许会有人说,submit是同步方法,至少在任务提交成功前是这样,是的,没错,但是上面的分析可以看到,addWorker中就已经开启Worker线程了,所以后面的从(2)开始,就已经都是异步的了(相比于submit的main线程)。

所以刚开始的时候submit后立即getActiveAcount,可以认为state是-1,所以getActiveAcount=1,之后submit执行完毕,开始getActiveAcount得到while循环,可能刚开始的几次循环,异步执行runWorker还没有执行到unlock,所以getActiveAcount还是1,但是经过几次循环后,runWorker执行了unlock但是还没有执行到lock,在这个时间间隙中,while循环中的getActiveAcount获取到的就是0,所以跳出了while循环,跳出后,runWorker执行了lock,getActiveAcount又得到的是1了,所以就产生了上面的第一种异常情况。