哪个线程执行 CompletableFuture’s tasks 和 callbacks?

标签: 基础技术 多线程 | 发表时间:2016-07-04 12:45 | 作者:liuchi1993
出处:http://www.importnew.com

CompletableFuture尽管在2014年的三月随着Java8被提出来,但它现在仍然是一种相对较新潮的概念。但也许这个类不为人所熟知是好事,因为它很容易被滥用,特别是涉及到使用线程和线程池的时候。而这篇文章的目的就是要描述线程是怎样使用 CompletableFuture的。

Running tasks

这是API的基础部分,它有一个很实用的supplyAsync()方法,这个方法和ExecutorService.submit()很像,但不同的是返回CompletableFuture:

CompletableFuture.supplyAsync(() -> {
            try (InputStream is = new URL("http://www.nurkiewicz.com").openStream()) {
                log.info("Downloading");
                return IOUtils.toString(is, StandardCharsets.UTF_8);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });

问题是supplyAsync()默认使用  ForkJoinPool.commonPool(),线程池由所有的CompletableFutures分享,所有的并行流和所有的应用都部署在同一个虚拟机上(如果你很不幸的仍在使用有很多人工部署的应用服务器)。这种硬编码的,不可配置的线程池完全超出了我们的控制,很难去监测和度量。因此你应该指定你自己的 Executor,就像这里(也可以看看这里 几种创造这样Exetutor的方法):

ExecutorService pool = Executors.newFixedThreadPool(10);

final CompletableFuture future =
        CompletableFuture.supplyAsync(() -> {
            //...
        }, pool);

这仅仅是开始…

Callbacks and transformations

假如你想转换给定的CompletableFuture,例如提取String的长度:

CompletableFuture intFuture =
    future.thenApply(s -> s.length());

那么是谁调用了 s.length()?坦白点,我一点也不在乎。只要涉及到lambda表达式,那么所有的执行者像thenApply这样的就是廉价的,我们并不关心是谁调用了lambda表达式。但如果这样的表达式会占用一点点的CPU来完成阻塞的网络通信那又会如何呢?

首先默认情况下会发生什么?试想一下:我们有一个返回String类型的后台任务,当结果完成时我们想要异步地去执行特定的变换。最容易的实现方法是通过包装一个原始的任务(返回String),任务完成时截获它。当内部的task结束后,回调就开始执行,执行变换和返回改进的值。就像有一个面介于我们的代码和初始的计算结果之间(个人看法:这里指的是下面的future里面包含的task执行完毕返回结果s,然后立马执行callback也就是thenApply里面的lambda表达式,这也就是为什么作者说有一个面位于初始计算结果和回调执行代码之间)。那就是说这应该相当明显了,s.length()的变换会在和执行原始任务相同的线程里完成,哈?并不完全是这样!(这里指的是有时候变换的线程和执行原始任务的线程不是同一个线程,看下面就知道)

CompletableFuture future =
        CompletableFuture.supplyAsync(() -> {
            sleepSeconds(2);
            return "ABC";
        }, pool);

future.thenApply(s -> {
    log.info("First transformation");
    return s.length();
});

future.get();
pool.shutdownNow();
pool.awaitTermination(1, TimeUnit.MINUTES);

future.thenApply(s -> {
    log.info("Second transformation");
    return s.length();
});

如果future里面的task还在运行,那么包含first transformation的 thenApply()就会一直处于挂起状态。而这个task完成后thenApply()会立即执行,执行的线程和执行task的线程是同一个。然而在注册第二次变换之前(也就是执行第二个thenApply()),我们将一直等待直到task完成(和第一个变换是一样的,都需要等待)。更坏的情况是,我们完全地关闭了线程池,保证其他的代码将不会执行。那么哪个线程将要执行二次变换呢?我们都知道当注册了callback的future完成时,二次变换必定会立刻执行。这就是说它是使用默认的主线程(来完成callback),上面的代码输出如下:

pool-1-thread-1 | First transformation      main | Second transformation

二次变换在注册的时候就意识到CompletableFuture已经完成了(指的是future里面的task已经返回结果,其实在第一次调用thenApply()之前就已经返回了,所以这一次不用等待task),因此它立刻执行了变换。由于此时已经没有其他的线程,所以thenApply()就只能在当前的main线程环境中被调用。最主要的原因还是因为这种行为机制在实际的变换成本很高时(如很耗时)很容易出错。想象一下thenApply()内部的lambda表达式在进行一些繁重的计算或者阻塞的网络调用,突然我们的异步  CompletableFuture阻塞了调用者线程!

Controlling callback’s thread pool

有两种技术去控制执行回调和变换的线程,需要注意的是这些方法仅仅适用你的变换需要很高成本的时候,其他情况下可以忽略。那么第一个方法可以选择使用操作者的  *Async方法,例如:

future.thenApplyAsync(s -> {
    log.info("Second transformation");
    return s.length();
});

这一次second transformation被自动地卸载到了我们的老朋友线程ForkJoinPool.commonPool()中去了:

pool-1-thread-1                  | First transformation
ForkJoinPool.commonPool-worker-1 | Second transformation

但我们并不喜欢commonPool,所以我们提供自己的:

future.thenApplyAsync(s -> {
    log.info("Second transformation");
    return s.length();
}, pool2);

注意到这里使用的是不同的线程池(pool-1 vs. pool-2):

pool-1-thread-1 | First transformation
pool-2-thread-1 | Second transformation

Treating callback like another computation step

我相信如果你在处理一些长时间运行的callbacks和transformations上有些麻烦(记住这篇文章同样也适用于CompletableFuture的其他大部分方法),你应该简单地使用其他表意明确的CompletableFuture,就像这样:

//Imagine this is slow and costly
CompletableFuture<Integer> strLen(String s) {
    return CompletableFuture.supplyAsync(
            () -> s.length(),
            pool2);
}

//...

CompletableFuture<Integer> intFuture =
        future.thenCompose(s -> strLen(s));

这种方法更加明确,知道我们的变换有很大的开销,我们不会将它运行在一些随意的不可控的线程上。取而代之的是我们会将String到CompletableFuture<Integer>的变换封装为一个异步操作。然而,我们必须用thenCompose()取代thenApply(),否则的话我们会得到CompletableFuture<CompletableFuture<Integer>>.

但如果我们的transformation 没有一个能够很好地处理嵌套CompletableFuture的形式怎么办,如applyToEither()会等待第一个 Future完成然后执行transformation.

CompletableFuture<CompletableFuture<Integer>> poor =
        future1.applyToEither(future2, s -> strLen(s));

这里有个很实用的技巧,用来“展开”这类难以理解的数据结构,这种技巧叫flatten,通过使用 flatMap(identity) (or  flatMap(x -> x))。在我们的例子中flatMap()就叫做thenCompose:

CompletableFuture<Integer> good =
        poor.thenCompose(x -> x);

我把它留给你,去弄懂它是怎样和为什么这样工作的。我想这篇文章已经尽量清楚地阐述了线程是如何参与到CompletableFuture中去的。

相关文章

相关 [线程 completablefuture tasks] 推荐:

哪个线程执行 CompletableFuture’s tasks 和 callbacks?

- - ImportNew
CompletableFuture尽管在2014年的三月随着Java8被提出来,但它现在仍然是一种相对较新潮的概念. 但也许这个类不为人所熟知是好事,因为它很容易被滥用,特别是涉及到使用线程和线程池的时候. 而这篇文章的目的就是要描述线程是怎样使用 CompletableFuture的. 这是API的基础部分,它有一个很实用的supplyAsync()方法,这个方法和ExecutorService.submit()很像,但不同的是返回CompletableFuture:.

Java CompletableFuture 详解

- - 鸟窝
Future是Java 5添加的类,用来描述一个异步计算的结果. 你可以使用 isDone方法检查计算是否完成,或者使用 get阻塞住调用线程,直到计算完成返回结果,你也可以使用 cancel方法停止任务的执行. 虽然 Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果.

Google Tasks Porter 可帮你导入/导出 Google Tasks 数据

- lichzy - 谷奥——探寻谷歌的奥秘
Google Tasks Porter作为 Data Liberation Front 的一部分发布,它可以帮助你导入/导出 Google Tasks 数据. Google Tasks Porter支持导入/导出的数据格式包括:. Microsoft Outlook(通过CSV). Remember the Milk(通过iCalendar导出,通过邮件导入).

Google Tasks 可多台電腦共用的雲端「便利貼」

- Lin - 重灌狂人
回首頁 | 訂閱新文章 | 常用軟體 | 科技新聞 | 隨機文章 |. Google Tasks 可多台電腦共用的雲端「便利貼」. 在 Gmail 與 Google 日曆網站裡面有個「工作表」的功能,可以讓我們隨時記錄待辦事項、工作清單或各種簡短的備忘錄,不過一般情形下只能開啟 Gmail 或 Google 日曆網站才能使用,這樣是有點小小不方便,如果希望能隨時按一下就能新增記事或備忘錄的話,該怎麼做比較好呢.

MySQL Replication 线程

- - CSDN博客推荐文章
Replication 线程. Mysql 的Replication 是一个异步的复制过程,从一个Mysql instace(我们称之为Master)复制到另一个Mysql instance(我们称之Slave). 在Master 与Slave 之间的实现整个复制过程主. 要由三个线程来完成,其中两个线程(Sql 线程和IO 线程)在Slave 端,另外一个线程(IO 线程)在Master 端.

Java线程池

- - 企业架构 - ITeye博客
线程的使用在java中占有极其重要的地位,在jdk1.4极其之前的jdk版本中,关于线程池的使用是极其简陋的. 在jdk1.5之后这一情况有了很大的改观. Jdk1.5之后加入了java.util.concurrent包,这个包中主要介绍java中线程以及线程池的使用. 为我们在开发中处理线程的问题提供了非常大的帮助.

Java 线程池

- - 编程语言 - ITeye博客
在项目中,系统启动一个新线程的成本是比较高的,因为它涉及与操作系统交互. 在这种情形下,使用线程池可以很好地提高性能,尤其是当程序中需要创建大量生存周期很短的线程时,更应该考虑使用线程池. 使用线程池可以有效地控制系统中并发线程的数量,当系统中包含大量并发线程时,会导致系统性能剧烈下降,甚至导致JVM崩溃,而线程池的最大线程数参数可以控制系统中并发线程数不超过此数.

Java线程(四):线程中断、线程让步、线程睡眠、线程合并

- - 编程语言 - ITeye博客
 最近在Review线程专栏,修改了诸多之前描述不够严谨的地方,凡是带有Review标记的文章都是修改过了. 本篇文章是插进来的,因为原来没有写,现在来看传统线程描述的不太完整,所以就补上了. 理解了线程同步和线程通信之后,再来看本文的知识点就会简单的多了,本文是做为传统线程知识点的一个补充. 有人会问:JDK5之后有了更完善的处理多线程问题的类(并发包),我们还需要去了解传统线程吗.

Java Thread多线程

- - CSDN博客推荐文章
Java Thread多线程. Java 多线程例子1 小例子. super("zhuyong");//设置线程的名字,默认为“TestThread”. Java 多线程例子2 前台线程(用户线程) 后台线程(守护线程 ). 1,setDaemon(true)后就是后台线程(守护线程 ),反之就是前台线程(用户线程).

Android线程大坑

- - 移动开发 - ITeye博客
     android界面的更新实在主线程进行的,通常把主线程也叫UI线程,UI线程里进行事件的分发和交互. 在UI线程中进行耗时操作,比如网络请求,IO操作等会阻塞UI线程,界面会卡住,并且超过大概5秒钟程序会ANR(Application Not Responding),也就是死掉. 其实这种GUI单线程的思想在我上一篇博客(http://zyqwst.iteye.com/blog/2262011)都有阐述,道理一模一样,只是android实现的方式上略有不同,所以我建议把上一篇Swing线程的博客能够阅读一遍,Android线程的问题豁然开朗,始终晋级GUI开发的原则:在UI线程中进行界面的更新操作,在单独线程中进行耗时操作.