Future 与 FutureTask

标签: 并发 future juc | 发表时间:2014-03-11 22:23 | 作者:coderbee
出处:http://coderbee.net

Future

来自 Java DOC 文档:Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使用 get 方法来获取结果,如有必要,计算完成前可以阻塞此方法。取消则由 cancel 方法来执行。还提供了其他方法,以确定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算。如果为了可取消性而使用 Future 但又不提供可用的结果,则可以声明 Future<?> 形式类型、并返回 null 作为底层任务的结果。

也就是说Future具有这样的特性:

  • 异步执行,可用 get 方法获取执行结果。
  • 如果计算还没完成,get 方法是会阻塞的,如果完成了,是可以多次获取并立即得到结果的。
  • 如果计算还没完成,是可以取消计算的。
  • 可以查询计算的执行状态。

埋两个小问题用于设想下怎么实现Future:

  1. Future在计算完成前阻塞 get 访问,完成后可以自由访问,如何实现 get 方法?
  2. 计算的取消是怎么实现的?被取消的计算会终止执行吗?

FutureTask

FutureTask 是JUC里对Future的一个实现,还实现了 Runnable 接口,所以可以提交给线程池执行。

FutureTask 只有一个自定义的同步器 Sync 的属性,所有的方法都是委派给此同步器来实现。这也是JUC里使用AQS的通用模式。

下面的代码是 JDK 1.7.10 的源码。

FutureTask 实现

FutureTask的定义,省略了很多代码。

  public class FutureTask<V> implements RunnableFuture<V> {
    // FutureTask 用于同步控制
    private final Sync sync;

    public boolean isCancelled() {
        return sync.innerIsCancelled();
    }

    public boolean isDone() {
        return sync.innerIsDone();
    }

    public V get() throws InterruptedException, ExecutionException {
        return sync.innerGet();
    }

    // 省略其他方法
}

FutureTask 的同步器

由于Future在任务完成后,可以多次自由获取结果,因此,用于控制同步的AQS使用共享模式。

FutureTask 底层任务的执行状态保存在AQS的状态里。AQS是否允许线程获取(是否阻塞)是取决于任务是否执行完成,而不是具体的状态值。

  private final class Sync extends AbstractQueuedSynchronizer {
    // 定义表示任务执行状态的常量。由于使用了位运算进行判断,所以状态值分别是2的幂。

    // 表示任务已经准备好了,可以执行
    private static final int READY     = 0;

    // 表示任务正在执行中
    private static final int RUNNING   = 1;

    // 表示任务已执行完成
    private static final int RAN       = 2;

    // 表示任务已取消
    private static final int CANCELLED = 4;


    // 底层的表示任务的可执行对象
    private final Callable<V> callable;

    // 表示任务执行结果,用于get方法返回。
    private V result;

    // 表示任务执行中的异常,用于get方法调用时抛出。
    private Throwable exception;

     /*
     * 用于执行任务的线程。在 set/cancel 方法后置为空,表示结果可获取。
     * 必须是 volatile的,用于确保完成后(result和exception)的可见性。
     * (如果runner不是volatile,则result和exception必须都是volatile的)
     */
    private volatile Thread runner;


     /**
     * 已完成或已取消 时成功获取
     */
    protected int tryAcquireShared( int ignore) {
        return innerIsDone() ? 1 : -1;
    }

    /**
     * 在设置最终完成状态后让AQS总是通知,通过设置runner线程为空。
     * 这个方法并没有更新AQS的state属性,
     * 所以可见性是通过对volatile的runner的写来保证的。
     */
    protected boolean tryReleaseShared( int ignore) {
        runner = null;
        return true;
    }


     // 执行任务的方法
    void innerRun() {
        // 用于确保任务不会重复执行
        if (!compareAndSetState(READY, RUNNING))
            return;

        // 由于Future一般是异步执行,所以runner一般是线程池里的线程。
        runner = Thread.currentThread();

        // 设置执行线程后再次检查,在执行前检查是否被异步取消
        // 由于前面的CAS已把状态设置RUNNING,
        if (getState() == RUNNING) { // recheck after setting thread
            V result;
            //
            try {
                result = callable.call();
            } catch (Throwable ex) {
                // 捕获任务执行过程中抛出的所有异常
                setException(ex);
                return;
            }
            set(result);
        } else {
      // 释放等待的线程
            releaseShared(0); // cancel
        }
    }

    // 设置结果
    void innerSet(V v) {
        // 放在循环里进行是为了失败后重试。
        for (;;) {
            // AQS初始化时,状态值默认是 0,对应这里也就是 READY 状态。
            int s = getState();

            // 已完成任务不能设置结果
            if (s == RAN)
                return;

            // 已取消 的任务不能设置结果
            if (s == CANCELLED) {
                // releaseShared 会设置runner为空,
                // 这是考虑到与其他的取消请求线程 竞争中断 runner
                releaseShared(0);
                return;
            }

            // 先设置已完成,免得多次设置
            if (compareAndSetState(s, RAN)) {
                result = v;
                releaseShared(0); // 此方法会更新 runner,保证result的可见性
                done();
                return;
            }
        }
    }

    // 获取异步计算的结果
    V innerGet() throws InterruptedException, ExecutionException {
        acquireSharedInterruptibly(0);// 获取共享,如果没有完成则会阻塞。

        // 检查是否被取消
        if (getState() == CANCELLED)
            throw new CancellationException();

        // 异步计算过程中出现异常
        if (exception != null)
            throw new ExecutionException(exception);

        return result;
    }

    // 取消执行任务
    boolean innerCancel( boolean mayInterruptIfRunning) {
        for (;;) {
            int s = getState();

            // 已完成或已取消的任务不能再次取消
            if (ranOrCancelled(s))
                return false;

            // 任务处于 READY 或 RUNNING
            if (compareAndSetState(s, CANCELLED))
                break;
        }
        // 任务取消后,中断执行线程
        if (mayInterruptIfRunning) {
            Thread r = runner;
            if (r != null)
                r.interrupt();
        }
        releaseShared(0); // 释放等待的访问结果的线程
        done();
        return true;
    }

    /**
     * 检查任务是否处于完成或取消状态
     */
    private boolean ranOrCancelled( int state) {
        return (state & (RAN | CANCELLED)) != 0;
    }

     // 其他方法省略
}

从 innerCancel 方法可知,取消操作只是改变了任务对象的状态并可能会中断执行线程。如果任务的逻辑代码没有响应中断,则会一直异步执行直到完成,只是最终的执行结果不会被通过get方法返回,计算资源的开销仍然是存在的。

总的来说,Future 是线程间协调的一种工具。

相关 [future futuretask] 推荐:

Future 与 FutureTask

- - 码蜂笔记
来自 Java DOC 文档:Future 表示异步计算的结果. 它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果. 计算完成后只能使用 get 方法来获取结果,如有必要,计算完成前可以阻塞此方法. 取消则由 cancel 方法来执行. 还提供了其他方法,以确定任务是正常完成还是被取消了.

Java中的Runnable、Callable、Future、FutureTask的区别

- - IT瘾-geek
Java中存在Runnable、Callable、Future、FutureTask这几个与线程相关的类或者接口,在Java中也是比较重要的几个概念,我们通过下面的简单示例来了解一下它们的作用于区别. 其中Runnable应该是我们最熟悉的接口,它只有一个run()函数,用于将耗时操作写在其中, 该函数没有返回值.

Java线程之FutureTask

- - zzm
FutureTask是Future和Callable的结合体. 然后通过Future来取得计算结果. 但是,若开启了多个任务,我们无从知晓哪个任务最先结束,因此,若要实现“当某任务结束时,立刻做一些事情,例如记录日志”这一功能,就需要写一些额外的代码. FutureTask正是为此而存在,他有一个回调函数protected void done(),当任务结束时,该回调函数会被触发.

java异步计算Future

- - 互联网 - ITeye博客
从jdk1.5开始我们可以利用Future来跟踪异步计算的结果. 在此之前主线程要想获得工作线程(异步计算线程)的结果是比较麻烦的事情,需要我们进行特殊的程序结构设计,比较繁琐而且容易出错. 有了Future我们就可以设计出比较优雅的异步计算程序结构模型:根据分而治之的思想,我们可以把异步计算的线程按照职责分为3类:.

拍摄回你的前半生:Back to the Future

- 仙人掌 - cnBeta.COM
每逢看看自己的旧照片,不禁为到岁月飞逝而怀缅过去. 阿根廷摄影师Irina Werning 组织了以Back to the Future(回到未来)为主题的拍摄项目,摄影师要求被拍者找到当年的故人,并且重现几十年前的场景.

java 线程池使用 Runnable&Callable&Future

- - Java - 编程语言 - ITeye博客
执行一次线程,调用Runnable接口实现.  当线程池执行Runnable后,返回的Future.get()总是null. DefaultRunnable代码如下:. 执行一次线程,调用Callable接口实现.  当线程池执行Callable时,返回的Future.get() 会返回Callable的返回值.

微软推新 Future Vision(未来愿景)概念视频

- ENOCH - LiveSino - LiveSide 中文版
接着著名的“微软未来愿景 2019”、微软下一代办公交互墙、未来实时协作等概念视频,微软今天又发布了一则新的视频,主题和 2019 完全相同 – 科技如何改善未来. 另外,视频中有展示相当多已经成熟的移动设备和 Metro UI 设计语言的应用场景. 微软正式完成 Skype 收购. 微软未来愿景 – 实时协作 2010 视频.

利用netty中的future获取异步执行的结果

- - Java - 编程语言 - ITeye博客
      前段时间使用netty3,感受到其对于future的设计在写异步操作时的高效与便捷,通过future与futurelistener的组合实现异步的通知. 这个在平时写异步执行代码的中经常用到.       其实JDK也有Future这个接口,是active object模式的一种实现. 最主要的思想就是让任务的调度和任务的执行分离.