Apache Beam 快速入门(Python 版) | 张吉的博客

标签: | 发表时间:2017-09-14 05:58 | 作者:
分享到:
出处:http://shzhangji.com

Apache Beam是一种大数据处理标准,由谷歌于 2016 年创建。它提供了一套统一的 DSL 用以处理离线和实时数据,并能在目前主流的大数据处理平台上使用,包括 Spark、Flink、以及谷歌自身的商业套件 Dataflow。Beam 的数据模型基于过去的几项研究成果:FlumeJavaMillwheel,适用场景包括 ETL、统计分析、实时计算等。目前,Beam 提供了两种语言的 SDK:Java、Python。本文将讲述如何使用 Python 编写 Beam 应用程序。

Apache Beam Pipeline

安装 Apache Beam

Apache Beam Python SDK 必须使用 Python 2.7.x 版本,你可以安装pyenv来管理不同版本的 Python,或者直接从源代码编译安装(需要支持 SSL)。之后,你便可以在 Python 虚拟环境中安装 Beam SDK 了:

1
2
3
$ virtualenv venv --distribute
$ source venv/bin/activate
(venv) $ pip install apache-beam

Wordcount 示例

Wordcount 是大数据领域的 Hello World,我们来看如何使用 Beam 实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
from__future__importprint_function
importapache_beamasbeam
fromapache_beam.options.pipeline_optionsimportPipelineOptions
withbeam.Pipeline(options=PipelineOptions())asp:
lines = p |'Create'>> beam.Create(['cat dog','snake cat','dog'])
counts = (
lines
|'Split'>> (beam.FlatMap(lambdax: x.split(' '))
.with_output_types(unicode))
|'PairWithOne'>> beam.Map(lambdax: (x,1))
|'GroupAndSum'>> beam.CombinePerKey(sum)
)
counts |'Print'>> beam.ParDo(lambda(w, c): print('%s: %s'% (w, c)))

运行脚本,我们便可得到每个单词出现的次数:

1
2
3
4
(venv) $ python wordcount.py
cat: 2
snake: 1
dog: 2

Apache Beam 有三个重要的基本概念:Pipeline、PCollection、以及 Transform。

  • Pipeline(管道)用以构建数据集和处理过程的 DAG(有向无环图)。我们可以将它看成 MapReduce 中的Job或是 Storm 的Topology
  • PCollection是一种数据结构,我们可以对其进行各类转换操作,如解析、过滤、聚合等。它和 Spark 中的RDD概念类似。
  • Transform(转换)则用于编写业务逻辑。通过它,我们可以将一个 PCollection 转换成另一个 PCollection。Beam 提供了许多内置的转换函数,我们将在下文讨论。

在本例中,PipelinePipelineOptions用来创建一个管道。通过with关键字,上下文管理器会自动调用Pipeline.runwait_until_finish方法。

1
[Output PCollection] = [Input PCollection] | [Label] >> [Transform]

|是 Beam 引入的新操作符,用来添加一个转换。每次转换都可以定义一个唯一的标签,默认由 Beam 自动生成。转换能够串联,我们可以构建出不同形态的转换流程,它们在运行时会表示为一个 DAG。

beam.Create用来从内存数据创建出一个 PCollection,主要用于测试和演示。Beam 提供了多种内置的输入源(Source)和输出目标(Sink),可以接收和写入有界(Bounded)或无界(Unbounded)的数据,并且能进行自定义。

beam.Map是一种一对一的转换,本例中我们将一个个单词转换成形如(word, 1)的元组。beam.FlatMap则是MapFlatten的结合体,通过它,我们将包含多个单词的数组合并成一个一维的数组。

CombinePerKey的输入源是一系列的二元组(2-element tuple)。这个操作会将元素的第一个元素作为键进行分组,并将相同键的值(第二个元素)组成一个列表。最后,我们使用beam.ParDo输出统计结果。这个转换函数比较底层,我们会在下文详述。

输入与输出

目前,Beam Python SDK 对输入输出的支持十分有限。下表列出了现阶段支持的数据源(资料来源):

语言文件系统消息队列数据库
JavaHDFS

TextIO

XML
AMQP

Kafka

JMS
Hive

Solr

JDBC
Pythontextio

avroio

tfrecordio
-Google Big Query

Google Cloud Datastore

这段代码演示了如何使用textio对文本文件进行读写:

1
2
lines = p |'Read'>> beam.io.ReadFromText('/path/to/input-*.csv')
lines |'Write'>> beam.io.WriteToText('/path/to/output', file_name_suffix='.csv')

通过使用通配符,textio可以读取多个文件。我们还可以从不同的数据源中读取文件,并用Flatten方法将多个PCollection合并成一个。输出文件默认也会是多个,因为 Beam Pipeline 是并发执行的,不同的进程会写入独立的文件。

转换函数

Beam 中提供了基础和上层的转换函数。通常我们更偏向于使用上层函数,这样就可以将精力聚焦在实现业务逻辑上。下表列出了常用的上层转换函数:

转换函数功能含义
Create(value)基于内存中的集合数据生成一个 PCollection。
Filter(fn)使用fn函数过滤 PCollection 中的元素。
Map(fn)使用fn函数做一对一的转换处理。
FlatMap(fn)功能和Map类似,但是fn需要返回一个集合,里面包含零个或多个元素,最终FlatMap会将这些集合合并成一个 PCollection。
Flatten()合并多个 PCollection。
Partition(fn)将一个 PCollection 切分成多个分区。fn可以是PartitionFn或一个普通函数,能够接受两个参数:elementnum_partitions
GroupByKey()输入源必须是使用二元组表示的键值对,该方法会按键进行分组,并返回一个(key, iter<value>)的序列。
CoGroupByKey()对多个二元组 PCollection 按相同键进行合并,如输入的是(k, v)(k, w),则输出(k, (iter<v>, iter<w>))
RemoveDuplicates()对 PCollection 的元素进行去重。
CombinePerKey(fn)功能和GroupByKey类似,但会进一步使用fn对值列表进行合并。fn可以是一个CombineFn,或是一个普通函数,接收序列并返回结果,如summax函数等。
CombineGlobally(fn)使用fn将整个 PCollection 合并计算成单个值。

Callable, DoFn, ParDo

可以看到,多数转换函数都会接收另一个函数(Callable)做为参数。在 Python 中,Callable可以是一个函数、类方法、Lambda 表达式、或是任何包含__call__方法的对象实例。Beam 会将这些函数包装成一个DoFn类,所有转换函数最终都会调用最基础的ParDo函数,并将DoFn传递给它。

我们可以尝试将lambda x: x.split(' ')这个表达式转换成DoFn类:

1
2
3
4
5
classSplitFn(beam.DoFn):
defprocess(self, element):
returnelement.split(' ')
lines | beam.ParDo(SplitFn())

ParDo转换和FlatMap的功能类似,只是它的fn参数必须是一个DoFn。除了使用return,我们还可以用yield语句来返回结果:

1
2
3
4
classSplitAndPairWithOneFn(beam.DoFn):
defprocess(self, element):
forwordinelement.split(' '):
yield(word,1)

合并函数

合并函数(CombineFn)用来将集合数据合并计算成单个值。我们既可以对整个 PCollection 做合并(CombineGlobally),也可以计算每个键的合并结果(CombinePerKey)。Beam 会将普通函数(Callable)包装成CombineFn,这些函数需要接收一个集合,并返回单个结果。需要注意的是,Beam 会将计算过程分发到多台服务器上,合并函数会被多次调用来计算中间结果,因此需要满足交换律结合律summinmax是符合这样的要求的。

Beam 提供了许多内置的合并函数,如计数、求平均值、排序等。以计数为例,下面两种写法都可以用来统计整个 PCollection 中元素的个数:

1
2
lines | beam.combiners.Count.Globally()
lines | beam.CombineGlobally(beam.combiners.CountCombineFn())

其他合并函数可以参考 Python SDK 的官方文档(链接)。我们也可以自行实现合并函数,只需继承CombineFn,并实现四个方法。我们以内置的Mean平均值合并函数的源码为例:

apache_beam/transforms/combiners.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
classMeanCombineFn(core.CombineFn):
defcreate_accumulator(self):
"""创建一个“本地”的中间结果,记录合计值和记录数。"""
return(0,0)
defadd_input(self,(sum_, count), element):
"""处理新接收到的值。"""
returnsum_ + element, count +1
defmerge_accumulators(self, accumulators):
"""合并多个中间结果。"""
sums, counts = zip(*accumulators)
returnsum(sums), sum(counts)
defextract_output(self,(sum_, count)):
"""计算平均值。"""
ifcount ==0:
returnfloat('NaN')
returnsum_ / float(count)

复合转换函数

我们简单看一下上文中使用到的beam.combiners.Count.Globally的源码(链接),它继承了PTransform类,并在expand方法中对 PCollection 应用了转换函数。这会形成一个小型的有向无环图,并合并到最终的 DAG 中。我们称其为复合转换函数,主要用于将相关的转换逻辑整合起来,便于理解和管理。

1
2
3
4
classCount(object):
classGlobally(ptransform.PTransform):
defexpand(self, pcoll):
returnpcoll | core.CombineGlobally(CountCombineFn())

更多内置的复合转换函数如下表所示:

复合转换函数功能含义
Count.Globally()计算元素总数。
Count.PerKey()计算每个键的元素数。
Count.PerElement()计算每个元素出现的次数,类似 Wordcount。
Mean.Globally()计算所有元素的平均值。
Mean.PerKey()计算每个键的元素平均值。
Top.Of(n, reverse)获取 PCollection 中最大或最小的n个元素,另有 Top.Largest(n), Top.Smallest(n).
Top.PerKey(n, reverse)获取每个键的值列表中最大或最小的n个元素,另有 Top.LargestPerKey(n), Top.SmallestPerKey(n)
Sample.FixedSizeGlobally(n)随机获取n个元素。
Sample.FixedSizePerKey(n)随机获取每个键下的n个元素。
ToList()将 PCollection 合并成一个列表。
ToDict()将 PCollection 合并成一个哈希表,输入数据需要是二元组集合。

时间窗口

在处理事件数据时,如访问日志、用户点击流,每条数据都会有一个事件时间属性,而通常我们会按事件时间对数据进行分组统计,这些分组即时间窗口。在 Beam 中,我们可以定义不同的时间窗口类型,能够支持有界和无界数据。由于 Python SDK 暂时只支持有界数据,我们就以一个离线访问日志文件作为输入源,统计每个时间窗口的记录条数。对于无界数据,概念和处理流程也是类似的。

1
2
3
4
5
64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /edit HTTP/1.1" 401 12846
64.242.88.10 - - [07/Mar/2004:16:06:51 -0800] "GET /rdiff HTTP/1.1" 200 4523
64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /hsdivision HTTP/1.1" 200 6291
64.242.88.10 - - [07/Mar/2004:16:11:58 -0800] "GET /view HTTP/1.1" 200 7352
64.242.88.10 - - [07/Mar/2004:16:20:55 -0800] "GET /view HTTP/1.1" 200 5253

logmining.py的完整源码可以在 GitHub(链接)中找到:

1
2
3
4
5
6
7
8
9
10
lines = p |'Create'>> beam.io.ReadFromText('access.log')
windowed_counts = (
lines
|'Timestamp'>> beam.Map(lambdax: beam.window.TimestampedValue(
x, extract_timestamp(x)))
|'Window'>> beam.WindowInto(beam.window.SlidingWindows(600,300))
|'Count'>> (beam.CombineGlobally(beam.combiners.CountCombineFn())
.without_defaults())
)
windowed_counts = windowed_counts | beam.ParDo(PrintWindowFn())

首先,我们需要为每一条记录附加上时间戳。自定义函数extract_timestamp用以将日志中的时间[07/Mar/2004:16:05:49 -0800]转换成 Unix 时间戳,TimestampedValue则会将这个时间戳和对应记录关联起来。之后,我们定义了一个大小为10 分钟,间隔为5 分钟的滑动窗口(Sliding Window)。从零点开始,第一个窗口的范围是[00:00, 00:10),第二个窗口的范围是[00:05, 00:15),以此类推。所有窗口的长度都是10 分钟,相邻两个窗口之间相隔5 分钟。滑动窗口和固定窗口(Fixed Window)不同,因为相同的元素可能会落入不同的窗口中参与计算。最后,我们使用一个合并函数计算每个窗口中的记录数。通过这个方法得到前五条记录的计算结果为:

1
2
3
4
5
[2004-03-08T00:00:00Z, 2004-03-08T00:10:00Z) @ 2
[2004-03-08T00:05:00Z, 2004-03-08T00:15:00Z) @ 4
[2004-03-08T00:10:00Z, 2004-03-08T00:20:00Z) @ 2
[2004-03-08T00:15:00Z, 2004-03-08T00:25:00Z) @ 1
[2004-03-08T00:20:00Z, 2004-03-08T00:30:00Z) @ 1

在无界数据的实时计算过程中,事件数据的接收顺序是不固定的,因此需要利用 Beam 的水位线和触发器机制来处理延迟数据(Late Data)。这个话题比较复杂,而且 Python SDK 尚未支持这些特性,感兴趣的读者可以参考 Stream101102这两篇文章。

Pipeline 运行时

上文中提到,Apache Beam 是一个数据处理标准,只提供了 SDK 和 API,因而必须使用 Spark、Flink 这样的计算引擎来运行它。下表列出了当前支持 Beam Model 的引擎,以及他们的兼容程度:

Beam 运行时能力矩阵

图片来源

参考资料

相关 [apache beam python] 推荐:

Apache Beam 快速入门(Python 版) | 张吉的博客

- -
Apache Beam是一种大数据处理标准,由谷歌于 2016 年创建. 它提供了一套统一的 DSL 用以处理离线和实时数据,并能在目前主流的大数据处理平台上使用,包括 Spark、Flink、以及谷歌自身的商业套件 Dataflow. Beam 的数据模型基于过去的几项研究成果:FlumeJava、Millwheel,适用场景包括 ETL、统计分析、实时计算等.

Apache Beam:一个开源的统一的分布式数据处理编程库

- - 简单之美
Apache Beam是一个开源的数据处理编程库,由Google共享给Apache的项目,前不久刚刚成为Apache TLP项目. 它提供了一个高级的、统一的编程模型,允许我们通过构建Pipeline的方式实现批量、流数据处理,并且构建好的Pipeline能够运行在底层不同的执行引擎上. 刚刚接触该开源项目时,我的第一感觉就是:在编程API的设计上,数据集及其操作的抽象有点类似Apache Crunch(MapReduce Pipeline编程库)项目;而在支持统一数据处理模型上,能够让人想到Apache Flink项目.

顛覆性的傳輸功能:Android Beam

- daviddu - Android 資訊雜誌 android-hk.com
以往要分享網上資訊,通常都以電郵、短訊方式將相關 URL 發送給朋友,不過 Android 4.0 中一項創新的傳輸功能「Android Beam」出現,將會顛覆傳統要靠著第三者的傳輸模式. 「Android Beam」其實與 HP 之前發表的 Touch-to-share 功能十分相似,只要雙方的手機都內建了 NFC 技術,然後碰一碰機背,並且點擊「Touch to Beam」按鍵,你想要分享網頁就立即傳輸到對方的手機上了,而且同時還支援連絡人、Youtube 影片、地圖及 Android Market 頁面等內容分享.

Apache Shiro 介绍

- - CSDN博客推荐文章
什么是Apache Shiro?. Apache shiro 是一个强大而灵活的开源安全框架,可清晰地处理身份认证、授权、会话(session)和加密. Apache Shiro最主要的初衷是为了易用和易理解,处理安全问题可能非常复杂甚至非常痛苦,但并非一定要如此. 一个框架应该尽可能地将复杂的问题隐藏起来,提供清晰直观的API使开发者可以很轻松地开发自己的程序安全代码.

dropbox讲python

- chuang - Initiative
dropbox定制优化CPython虚拟机,自己搞了个malloc调度算法. 那个 !!!111cos(0). 期待这次PyCon China 2011.

Python调试

- - 企业架构 - ITeye博客
原文地址: http://blog.csdn.net/xuyuefei1988/article/details/19399137. 1、下面网上收罗的资料初学者应该够用了,但对比IBM的Python 代码调试技巧:. IBM:包括 pdb 模块、利用 PyDev 和 Eclipse 集成进行调试、PyCharm 以及 Debug 日志进行调试:.

Google 推出「Android Beam」基於 NFC 技術的手機分享應用(影片)

- 小趴 八足趴 八足 ramener - Engadget 中文版
其實 Google 早就已經做好在行動裝置平台擁抱 NFC 的準備,由電子錢包開始現在來到了手機間的分享技術. 在 Samsung 於香港的發表會上,這個在 Ice Cream Sandwich 系統中稱為「Android Beam」的全新功能,其實與 HP 的 Touch-to-share 功能十分相似.

Google 推出「Android Beam」基于 NFC 技术的手机分享应用(影片)

- austin - Engadget 中国版
其实 Google 早就已经做好在行动装置平台拥抱 NFC 的准备,由电子货币包开始现在来到了手机间的分享技术. 在 Samsung 于香港的发表会上,这个在 Ice Cream Sandwich 系统中称为「Android Beam」的全新功能,其实与 HP 的 Touch-to-share 功能十分相似.

Android Beam 告诉我们未来近距离数据传输新方式(视频)

- 云飞风起 - Engadget 中国版
你是否还记得惠普展示 TouchPad 和 Pre 3 的触控式数据交换么. 你初次看到的时候一定觉得很酷,不过现在想同时拥有 TouchPad 和 Pre 3 是一件很困难的事情. 现在 Android Beam 给了我们新答案,这个方式通过两台 Ice Cream Sandwich 系统手机设备的 NFC 技术进行.

Apache防止攻击

- - 小彰
为了防止恶意用户对Apache进行攻击,我们需要安装mod_security这个安全模块. mod_security 1.9.x模块的下载与安装. 下载地址: http://www.modsecurity.org/download/index.html. 建议使用1.9.x,因为2.x的配置指令与1.x完全不同,解压后进入解压目录,执行:.