利用Java SE 8流处理数据II(译)

标签: 利用 java se | 发表时间:2014-08-15 19:57 | 作者:Sha Jiang
出处:http://www.blogjava.net/
利用Java SE 8流处理数据
-- 结合Stream API的高级操作去表示富数据处理查询

本文是 Java Magazine 201405/06刊中的一篇文章,也是文章系列"利用Java SE 8流处理数据"中的第二篇,它基于flatMap()和collect()介绍了Java流的高级用法(2014.08.15最后更新)

在本系列的第一篇文章中,你看到了Java流让你能够使用与数据库操作相似的方法去处理集合。作为一个复习,清单1的例子展示了如何使用Stream API去求得大交易的金额之和。我们组建了一个管道,它由中间操作(filter和map)与最终操作(reduce)构成,图1形象地展示它。
清单1
int sumExpensive =
        transactions.stream()
        .filter(t -> t.getValue() > 1000)
        .map(Transaction::getValue)
        .reduce(0, Integer::sum);
图1

然而在系列的第一部分中,并没有研究这两个方法:
flatMap:这是一个中间操作,它允许将一个"map"和一个"flatten"操作结合在一起
collect:这是一个最终操作,它依据不同的方式,将流中的元素归集为一个结果。
这两个方法对于表达更为复杂的查询是十分有用的。例如,你可以将flatMap和collect结合起来,生成代表一个文字流中每个字母出现的次数的Map对象,如清单2所示。如果第一次看到这段代码觉得很惊奇时,但请不要担心。本文的目的就是要解释并探究这两个方法更多的细节。
清单2
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.*;

Stream<String> words = Stream.of("Java", "Magazine", "is", "the", "best");
Map<String, Long> letterToCount =
        words.map(w -> w.split(""))
        .flatMap(Arrays::stream)
        .collect(groupingBy(identity(), counting()));
清单2中的代码将会生成如清单3示的结果。棒极了,不是吗?让我们开始探究flatMap和collect方法是如何工作的。
清单3
[a:4, b:1, e:3, g:1, h:1, i:2, ..]

flatMap方法
假设你想找出文件中所有独一唯二的字。你会怎么做呢?
你可能认为这很简单;我们可以Files.lines(),在前面的文章中已见过了这个方法,因为它会返回一个包含文件中所有行的流。然后我们就可以使用map方法将每一行拆分成字,最后再使用distinct方法去除重复的字。第一次尝试得到的代码可能如清单4所示。
清单4
Files.lines(Paths.get("stuff.txt"))
        .map(line -> line.split("\\s+")) // Stream<String[]>
        .distinct() // Stream<String[]>
        .forEach(System.out::println);
不幸的是,这段程序并不十分正确。如果运行它,会得到令人生疑的结果,与下面的输出有些类似:
[Ljava.lang.String;@7cca494b
[Ljava.lang.String;@7ba4f24f
...
我们的第一次尝试确实打印出了代表几个流对象的字符串。那发生了什么呢?该方法的问题是,传给map方法的Lambda表达式返回的是文件中每一行的String数组(String[])。而我们真正想要的是一个表示文字的流的Stream<String>对象。
幸运的是,对于该问题有一个解决方案,就是使用flatMap方法。让我们一步一步地看看如何得到正确的解决方法。
首先,我们需要字的流,而不是数组的流。有一个名为Arrays.stream()的方法,它将使用一个数组作为参数,并生成一个流。请看清单5中的例子。
清单5
String[] arrayOfWords = {"Java", "Magazine"};
Stream<String> streamOfwords = Arrays.stream(arrayOfWords);
让我们在前面的流管道中使用该方法,看看会发生什么(见清单6)。这个方案依然行不通。那是因为我们最终得到的是一组流的流(准确地说,就是Stream<Stream<String>>)。确切地是,我们首先将每一行转换为一个字的数组,然后使用方法Arrays.stream()将每一个数组转换成一个流。
清单6
Files.lines(Paths.get("stuff.txt"))
       .map(line -> line.split("\\s+")) // Stream<String[]>
       .map(Arrays::stream) // Stream<Stream<String>>
       .distinct() // Stream<Stream<String>>
       .forEach(System.out::println);
我们使用flatMap()方法去解决这个问题,如清单7所示。使用flatMap()方法能够用流中的内容,而不是流去替换每一个生成的数组。换言之,通过map(Arrays::stream)方法生成的全部独立的流被合并或"扁平化"为一个流。图2形象地展示了使用flatMap()方法的效果。
清单7
Files.lines(Paths.get("stuff.txt"))
       .map(line -> line.split("\\s+")) // Stream<String[]>
       .flatMap(Arrays::stream) // Stream<String>
       .distinct() // Stream<String>
       .forEach(System.out::println);
本质上,flatMap让你可以使用其它流去替换另一个流中的每个元素,然后再将所有生成的流连合并为一个流。
请注意,flatMap()是一个通用的模式,在使用Optaional或CompletableFuture时,你还会看到它。

collect方法
现在让我们看看collect方法的更多细节。在本系列的第一篇文章中你所看到的方法,要么返回另一个流(即,这些方法是中间操作),要么返回一个值,例如一个boolean,一个int,或一个Optional对象(即,这些方法是最终操作)。
collect就是一个最终方法,但它有点儿不同,因为你可以用它将一个Stream对象转为一个List对象。例如,为了得到一个包含有所有高金额交易ID的列表,你可以使用像清单8那样的代码。
清单8
import static java.util.stream.Collectors.*;

List<Integer> expensiveTransactionsIds =
        transactions.stream()
        .filter(t -> t.getValue() > 1000)
        .map(Transaction::getId)
        .collect(toList());
传递给collect方法的参数就是一个类型为java.util.stream.Collector的对象。这个Collector对象是干什么的?本质上看,它描述了如何按照需要去收集流中的元素,再将它们生成为一个最终结果。之前用到的工厂方法Collector.toList()会返回一个Collector对象,它描述了如何将一个Stream对象归集为一个List对象。而且,Collctors内建有有许多相似的方法。例如,使用toSet()方法可以将一个Stream对象转化为一个Set对象,它会删除所有重复的元素。清单9中的代码展示了如何生成一个仅仅包含高金额交易所在城市的Set对象。(注意:在后面的例子中,我们假设Collectors类中的工厂方法都已通过语句import static java.util.stream.Collectors.*被静态引入了)
清单9
Set<String> cities =
        transactions.stream()
        .filter(t -> t.getValue() > 1000)
        .map(Transaction::getCity)
        .collect(toSet());
注意,无法保证会返回何种类型的Set对象。但是,通过使用toCollection(),你可以进行更多的控制。例如,若你想得到一个HashSet,可以传一个构造器给toCollection方法(见清单10)。
清单10
Set<String> cities =
        transactions.stream()
        .filter(t -> t.getValue() > 1000)
        .map(Transaction::getCity)
        .collect(toCollection(HashSet::new));
然而,这并不是你能用collect和Collector所做的全部事情。实际上,这只是你能用它们所做的事情中的极小部分。下面是一些你所能表达的查询的例子:
将交易按货币分类,并计算每种货币的交易金额之和(返回一个Map<Currency, Integer>对象)
将交易划分成两组:高金额交易和非高金额交易(返回一个Map<Boolean, List<Transaction>>对象)
创建多层分组,例如先按交易发生的城市分组,再进一步按它们是否为高金额交易进行分组(返回一个Map<String, Map<Boolean, List<Transaction>>>)
兴奋吗?很好。让我们看看,你是如何使用Stream API和Collector来表达上述查询的。我们首先从一个简单的例子开始,这个例子要对这个流进行"总结":计算它的平均值,最大值和最小值。然后我们再看看如何表达简单的分组,最后,再看看如何将Collector组合起来去创建更为强大的查询,例如多层分组。
总结。让我们用一些简单的例子来热身一下。在之前的文章中,你已经看到如何使用reduce方法去计算流中元素的数量,最小值,最大值和平均值,以及如何使用基本数据类型元素的流。有一些预定义的Collector类也能让你完成那些功能。例如,可以使用counting()方法去计算元素的数量,如清单11所示。
清单11
long howManyTransactions = transactions.stream().collect(counting());
你可以使用summingDouble(),summingInt()和summingLong()分别对流中元素类型为Double,Int或Long的属性求和。在清单12中,我们计算出了所有交易的金额之和。
清单12
int totalValue = transactions.stream().collect(summingInt(Transaction::getValue));
类似的,使用averagingDouble(),averagingInt()和averagingLong()去计算平均值,如清单13所示。
清单13
double average = transactions.stream().collect(averagingInt(Transaction::getValue));
另外,使用maxBy()和minBy()方法,可以计算出流中值最大和最小的元素。但这里有一个问题:你需要为流中元素定义一个顺序,以能够对它们进行比较。这就是为什么maxBy()和minBy()方法使用使用一个Comparator对象作为参数,图3表明了这一点。
图3

在清单14的例子中,我们使用了静态方法comparing(),它将传入的函数作为参数,从中生成一个Comparator对象。该函数用于从流的元素中解析出用于进行比较的关键字。在这个例子中,通过使用交易金额作为比较的关键字,我们找到了那笔最高金额的交易。
清单14
Optional<Transaction> highestTransaction =
        transactions.stream()
        .collect(maxBy(comparing(Transaction::getValue)));
还有一个reducing()方法,由它产生的Collector对象会把流中的所有元素归集在一起,对它们重复的应用同一个操作,直到产生结果。该方法与之前看过的reduce()方法在原理上一样的。例如,清单15展示了使用了基于reducing()方法的另一种方式去计算所有交易的金额之和。
清单15
int totalValue = transactions.stream().collect(reducing(0, Transaction::getValue, Integer::sum));
reducing()方法使用三个参数:
初始值(如果流为空,则返回它);此处,该值为0。
应用于流中每个元素的函数对象;此处,该函数会解析出每笔交易的金额。
将两个由解析函数生成的金额合并在一起的方法;此处,我们只是把金额加起来。
你可能会说,"等等,使用其它的流方法,如reduce(),max()和min(),我已经可以做到这些了。那么,你为什么还要给我看这些方法呢?"后面,你将会看到我们将Collector结合起来去构建更为复杂的查询(例如,对加法平均数进行分组),所以,这也能更易于理解这些内建的Collector。
分组。这是一个普通的数据库查询操作,它使用属性去数据进行分组。例如,你也许想按币种对一组交易进行分组。若你使用如清单16所示的代码,通过显式的遍历去表达这个查询,那会是很痛苦的。
清单16
Map<Currency, List<Transaction>> transactionsByCurrencies = new HashMap< >();
for(Transaction transaction : transactions) {
    Currency currency = transaction.getCurrency();
    List<Transaction> transactionsForCurrency =
    transactionsByCurrencies.get(currency);

    if (transactionsForCurrency == null) {
        transactionsForCurrency = new ArrayList<>();
        transactionsByCurrencies.put(currency, transactionsForCurrency);
    }
    transactionsForCurrency.add(transaction);
}
你首先需要创建一个Map对象,它将收集所有的交易记录。然后,你需要遍历所有的交易记录,并解析出每笔交易的币种。在将交易记录使用一个值插入Map中之前,需要先检查一下,这个List是否已经创建过了,等等。
真是令人汗颜啊,因为我们想要是"按币种对交易进行分组"。为什么不得不涉及这么多代码呢?有好消息:有一个称为groupingBy()的Collector,它允许我们以简洁的方式来表达这个例子。我们可以使用清单17中的例子来表达这个相同的查询,现在代码的阅读更接近问题语句了。
清单17
Map<Currency, List<Transaction>> transactionsByCurrencies =
        transactions.stream().collect(groupingBy(Transaction::getCurrency));
工厂方法groupingBy()使用一个函数对象作为参数,该函数会解析出用于分类交易记录的关键字。我们称为这个函数为分类函数。在此处,为了按币种对交易进行分组,我们传入一个方法引用,Transaction::getCurrency。图4演示了这个分组操作。
图4

分割。有一个称为partitioningBy()的工厂方法,它可被视为一种特殊的groupingBy()方法。它使用一个谓语作为参数(该参数返回一个boolean值),然后根据元素是否满足这个谓语对它们进行分组。换言之,它将组成流的交易分割成了结构Map<Boolean, List<Transaction>>。例如,如若你想将交易分割成两组--低廉的和昂贵的--你就可以像清单18那样去使用partitioningBy()产生的Collector。此例中的Lambda表达式,t->t.getValue() > 1000,就是一个将交易分成低廉和昂贵的谓语。
清单18
Map<Boolean, List<Transaction>> partitionedTransactions =
        transactions.stream().collect(partitioningBy(t -> t.getValue() > 1000));
组合Collector。如果你熟悉SQL,你应该知道可以将GROUP BY与函数COUNT()和SUM()一块儿使用,以按币种和交易金额之和进行分组。那么,使用Stream API是否也可以实现相似的功能呢?当然可以。确切地说,有一个重载的groupingBy()方法,它使用另一个Collector作为第二个参数。这个额外的Collector对象用于定义在使用由groupingBy()产生的Collector时如何汇集所有与关键字相关的元素。
好吧,这听起来有些抽象,那么让我们看一个简单的例子。我们想基于每个城市的交易金额之和生成一个城市的Map对象(见清单19)。在此处,我们告诉groupingBy()方法使用getCity()方法作为分类方法。那么,得到的Map结果的Key就为城市。正常地,我们期望对Map中每个键所对应的值,即List<Transaction>对象,使用groupingBy()方法。
清单19
Map<String, Integer> cityToSum =
        transactions.stream().collect(groupingBy(Transaction::getCity,
        summingInt(Transaction::getValue)));
然后,我们却是传入了另一个Collector对象,它由summingInt()方法产生,该方法会将所有与特定城市相关的交易记录的金额加起来。结果,我们得到了一个Map<String, Integer>对象,它将每个城市与它们对应的所有交易的金额之和进行了映射。酷,不是吗?想想这个:基本的groupingBy(Transaction:getCity)方法其实就只是groupingBy(Transaction:getCity, toList())的简写。
让我们看看另一个例子。如果你想生成这样一个Map,它对每个城市与它的最大金额的交易记录进行映射,那要怎么做呢?你可能已经猜到了,我们可以重用前面过的由maxBy()方法产生的Collector,如清单20所示。
清单20
Map<String, Optional<Transaction>> cityToHighestTransaction =
        transactions.stream().collect(groupingBy(Transaction::getCity,
        maxBy(comparing(Transaction::getValue))));
你已经看到Stream API很善于表达,我们正在构建的一些十分有趣的查询都可以写的简洁些。你还能想象出回到从前去遍历地处理一个集合吗?让我们看一个更为复杂的例子,以结束这篇文章。你已看到groupingBy()方法可以将一个Collector对象作为参数,再根据进一步的分类规则去收集流中的元素。因为groupingBy()方法本身得到的也是一个Collector对象,那么通过传入另一个由groupingBy()方法得到的Collector对象,该Collector定义了第二级的分类规范,我们就能够创建多层次分组。
在清单21的代码中,先按城市对交易记录进行分组,再进一步对每个城市中的交易记录按币种进行分组,以得到每个城市中每个币种的所有交易记录的平均金额。图5就形象地展示了这种机制。
清单21
Map<String, Map<Currency, Double>> cityByCurrencyToAverage =
        transactions.stream().collect(groupingBy(Transaction::getCity,
        groupingBy(Transaction::getCurrency,  
        averagingInt(Transaction::getValue))));
图5

创建你自己的Collector。到目前为止,我们展示的全部Collector都实现了接口java.util.stream.Collector。这就意味着,你可以实现自己的Collector,以"定制"归一操作。但是对于这个主题,再写一篇文章可能更合适一些,所以我们不会在本文中讨论这个问题。

结论
在本文中,我们探讨了Stream API中的两个高级:flatMap和collect。它们是可以加到你的兵器库中的工具,可以用来表述丰富的数据处理查询。
特别地,你也已经看到了,collect()方法可被用于归纳,分组和分割操作。另外,这些操作还可能被结合在一起,去构建更为丰富的查询,例如"生产一个两层Map对象,它代表每个城市中每个币种的平均交易金额"。
然而,本文也没有查究到所有的内建Collector实现。请你去看看Collectors类,并试试其它的Collector实现,例如由mapping(),joining()和collectingAndThen(),也许你会发现它们也很有用。


Sha Jiang 2014-08-15 19:57 发表评论

相关 [利用 java se] 推荐:

Java SE 7发布

- Jingzhi - Solidot
甲骨文宣布发布Java SE 7,这是Java在甲骨文名下发布的第一个版本. 开发者在博客上称它是五年全世界Java社区协助成果,是Java发展中的重要一步. Java SE 7主要新特性包括:Project Coin,提高生产力,简化编程任务;Fork/Join Framework,支持多核心处理器,简化问题分解并行执行;InvokeDynamic,使其它语言能更容易的在JVM中运行.

利用Java SE 8流处理数据II(译)

- - BlogJava-首页技术区
利用Java SE 8流处理数据. -- 结合Stream API的高级操作去表示富数据处理查询. 本文是 Java Magazine 201405/06刊中的一篇文章,也是文章系列"利用Java SE 8流处理数据"中的第二篇,它基于flatMap()和collect()介绍了Java流的高级用法(2014.08.15最后更新).

Java SE 7 Exception的使用

- - ITeye博客
Java SE 7 Exception的使用. 在Java SE 7 中,作为Project Coin项目中众多有用的细小语言变化之一的加强型异常处理,现在来学习如何利用它. 在这边文章中,我们所涉及的一些变化是作为Java平台标准版7(Java SE 7)所发布,在JSR334(Java Specification Request)有详细的说明.

Java SE 6 故障排除指南 – 3、内存泄露

- - 码蜂笔记
如果你的应用程序执行的时间越来越长,或如果操作系统执行越来越慢,这可能是内存泄露的指示. 换句话说,虚拟内存被分配但在不需要时没有归还. 最终应用程序或系统没有可用内存,应用程序非正常终止. 这篇文章提供了一些涉及内存泄露的问题诊断的建议. 3.1 OutOfMemoryError 的含义. 一个最常见的内存泄露的指示是 java.lang.OutOfMemoryError 错误.

[译]Java SE 8 新特性之旅 : Java开发世界的大变动

- - 上善若水 厚德载物
我很自豪的成为了adopt-OpenJDK的一员,像其他专业团队成员一样,但是我只刚加入了8个月,我们一同经历了Java SE 8 的开发、编译、编码、讨论……等等,直到JDK上线. Java SE 8发布于2014年3月18日,现在可供下载使用. 我很高兴发布这一系列“Java SE 8 新特性之旅”,我会写一些例子来简化Java SE 8知识的获取、开发经验、新特性和API,然后.

Java SE 6 故障排除指南 – 4、系统崩溃故障排除

- - 码蜂笔记
崩溃或致命错误导致进程异常终止. 例如,崩溃可能是由于HotSpot VM、系统库、Java SE 库或API、程序本地代码、甚至操作系统里的 bug. 极端因素如操作系统资源耗尽也可以导致崩溃. 因 HotSpot VM 或 Java SE库代码导致的崩溃是罕见的. 有时候可以变通崩溃直到导致崩溃的源被诊断和修复(也就是可以避开崩溃).

Oracle Java SE 8 发行版更新:限制商业或生产用途

- - 开源中国社区最新新闻
Oracle Java SE 8 发行版更新. Oracle Java SE 8 的公开更新仍面向单独的个人使用提供,至少持续至 2020 年底. 2019 年 1 月以后发布的 Oracle Java SE 8 公开更新将不向没有商用许可证的业务、商用或生产用途提供. 如果您是使用者,将 Java 用于单独的个人用途,则至少在 2020 年底之前,您对 Oracle Java SE 8 更新仍具有与现在相同的访问权限.

Java SE 6 故障排除指南 – 5、挂起或循环进程故障排除

- - 码蜂笔记
本章为挂起或循环进程的故障排除在特定程序上提供了信息和指导. 问题在涉及挂起或循环进程时发生. 挂起可能因为多种原因发生,但经常是源于程序代码、API代码或库代码里的死锁. 挂起甚至是因为 HotSpot VM的bug. 有时候,一个表面上是挂起的可能是个循环. 例如,VM进程里的bug导致一个或多个线程进入死循环,会消耗掉所有可得CPU周期.

Spring Framework 4.0相关计划公布---包括对于Java SE 8 和Groovy2的支持

- - InfoQ cn
VMware公司旗下的SpringSource团队近日宣布了Spring Framework 4.0的相关计划,这是Spring框架的下一个升级版本,新的特性包括了对Java SE 8,Groovy 2,Java EE 7部分功能和WebSockets的支持. 在介绍Spring Framework 3.2版本的.

苹果发布2020年款iPhone SE 售价399美元

- - IT瘾-cnbeta
该机将于本周五(4月17日)开始预购,4月24日发货. 届时将有128GB机型提供,售价449美元,256GB机型售价549美元. 与其他iPhone一样,它将附带免费一年的Apple TV Plus,它将有黑色、白色和产品红三种颜色. iPhone SE本质上是一款拥有更好的摄像头和处理器的iPhone 8,价格也更低,虽然它沿用相对较老的设计, 但这款iPhone SE有 苹果的A13 Bionic芯片, 与最新的iPhone 11和11 Pro是同一个型号.