Java中的FutureTask

/ 默认分类 / 0 条评论 / 54浏览

一.类关系

先来看下相关接口的定义

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}
public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

Runnable和Callable都是可以作为线程执行逻辑的最小封装单元,区别就是Callable 接口定义了一个带返回值和能够抛出异常的 call() 方法,而 Runnable 接口定义了一个无返回值且不能抛出检查异常的 run() 方法。

另外可以看到,RunableFuture的实现类实际上可以作为Runnable或者Future来使用,因为它同时集成了Runnable接口和Future接口。

二.FutureTask构造方法

RunnableFuture接口的其中一个重要的实现类就是FutureTask。如下所示:

FutureTask的构造方法如下所示:

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

其中的Executors.callable()方法中就是在对runnable进行适配为callable,具体如下:

public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}

static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

三.FutureTask的状态转换

另外我们再来看下FutureTask中的state状态的变化图

四.主要源码阅读

4.1 run方法

状态变化可以先有个大概的印象即可,下面我们来看下FutureTask中的run方法的实现:

public void run() {
    //如果状态不是NEW或者当前线程未能成功设置runner,则直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        //获取Callable对象
        Callable<V> c = callable;
        //如果Callable对象不为null且当前task状态为NEW,则执行调用
        if (c != null && state == NEW) {
            V result; //定义结果变量
            boolean ran; //定义是否成功执行的标志
            try {
                result = c.call(); //调用Callable的call方法
                ran = true; //标记为成功执行
            } catch (Throwable ex) {
                result = null; //发生异常,设置结果为null
                ran = false; //标记为未成功执行
                setException(ex); //设置异常
            }
            //如果执行成功,则设置结果
            if (ran)
                set(result);
        }
    } finally {
        //在状态稳定之前,runner必须非空以防止并发调用run()
        runner = null;
        //在将runner置为null之后,必须重新读取状态以防止中断泄露
        int s = state;
        //如果状态大于等于INTERRUPTING,则处理可能的取消中断
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

Q&A

问题:为什么FutureTask中的操作需要使用Unsafe中的cas操作?一个FutureTask可能会被多个线程操作吗?

一个FutureTask可能会被多个线程操作。FutureTask是设计用来处理并发操作的,因此它可以被多个线程执行不同的操作,如执行任务、取消任务等。再比如上述的run方法中,开始的时候判断了state,执行完cas之后,在下面的if中又判断了一次state,然后才开始run的逻辑。

//将异常设置到任务输出中,并修改状态为异常
protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }
//这是正常执行完毕之后调用的,用于将执行逻辑中的输出设置到task的输出中,并设置状态为NORMAL
//结合上面我们说的状态的流转逻辑
protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

4.2 get方法

get方法就是获取FutureTask执行的结果

// 无参 get 方法,用于获取任务的结果
public V get() throws InterruptedException, ExecutionException {
    // 读取当前任务的状态
    int s = state;
    
    // 如果任务尚未完成(状态小于等于 COMPLETING)
    if (s <= COMPLETING)
        // 等待任务完成,false 表示不设置超时,0L 表示无限等待
        s = awaitDone(false, 0L);
    
    // 根据最终状态返回结果或抛出异常
    return report(s);
}

/**
 * 带超时的 get 方法,用于在指定时间内获取任务的结果
 * @throws CancellationException 如果任务被取消
 * @throws TimeoutException 如果等待超时
 */
public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    
    // 检查时间单位是否为空,如果为空,抛出空指针异常
    if (unit == null)
        throw new NullPointerException();
    
    // 读取当前任务的状态
    int s = state;
    
    // 如果任务尚未完成,并尝试等待指定时间
    if (s <= COMPLETING &&
        // 将超时时间转换为纳秒并传递给 awaitDone 方法
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        // 如果任务在超时时间内仍未完成,抛出超时异常
        throw new TimeoutException();
    
    // 根据最终状态返回结果或抛出异常
    return report(s);
}