Spark编程指南笔记
本文是参考Spark官方编程指南(Spark 版本为1.2)整理出来的学习笔记,主要是用于加深对 Spark 的理解,并记录一些知识点。
1. 一些概念
每一个 Spark 的应用,都是由一个驱动程序构成,它运行用户的 main 函数,在一个集群上执行各种各样的并行操作。
Spark 提出的最主要抽象概念是 弹性分布式数据集
,它是一个有容错机制(划分到集群的各个节点上)并可以被并行操作的元素集合。目前有两种类型的RDD:
-
并行集合
:接收一个已经存在的 Scala 集合,然后进行各种并行计算。 -
外部数据集
:外部存储系统,例如一个共享的文件系统,HDFS、HBase以及任何支持 Hadoop InputFormat 的数据源。
这两种类型的 RDD 都可以通过相同的方式进行操作。用户可以让 Spark 保留一个 RDD 在内存中,使其能在并行操作中被有效的重复使用,并且,RDD 能自动从节点故障中恢复。
Spark 的第二个抽象概念是 共享变量
,可以在并行操作中使用。在默认情况下,Spark 通过不同节点上的一系列任务来运行一个函数,它将每一个函数中用到的变量的拷贝传递到每一个任务中。有时候,一个变量需要在任务之间,或任务与驱动程序之间被共享。
Spark 支持两种类型的共享变量: 广播变量
,可以在内存的所有的结点上缓存变量; 累加器
:只能用于做加法的变量,例如计数或求和。
2. 编写程序
初始化 Spark
Spark 程序需要做的第一件事情,就是创建一个 SparkContext 对象,它将告诉 Spark 如何访问一个集群。这个通常是通过下面的构造器来实现的:
new SparkContext(master, appName, [sparkHome], [jars])
参数说明:
-
master
:用于指定所连接的 Spark 或者 Mesos 集群的 URL。 -
appName
:应用的名称,将会在集群的 Web 监控 UI 中显示。 -
sparkHome
:可选,你的集群机器上 Spark 的安装路径(所有机器上路径必须一致)。 -
jars
:可选,在本地机器上的 JAR 文件列表,其中包括你应用的代码以及任何的依赖,Spark 将会把他们部署到所有的集群结点上。
在 python 中初始化,示例代码如下:
//conf = SparkContext("local", "Hello Spark")
conf = SparkConf().setAppName("Hello Spark").setMaster("local")
sc = SparkContext(conf=conf)
说明:如果部署到集群,在分布式模式下运行,最后两个参数是必须的。
第一个参数可以是以下任一种形式:
Master URL | 含义 |
---|---|
loca l |
默认值,使用一个 Worker 线程本地化运行(完全不并行) |
local[K] |
使用 K 个 Worker 线程本地化运行(理想情况下,K 应该根据运行机器的 CPU 核数设定) |
spark://HOST:PORT |
连接到指定的 Spark 单机版集群 master 进程所在的主机和端口,端口默认是7077 |
mesos://HOST:PORT |
连接到指定的 Mesos 集群。host 参数是Moses master的hostname。端口默认是5050 |
如果你在一个集群上运行 spark-shell,则 master 参数默认为 local
,在启动之前你可以通过修改配置文件指定 ADD_JAR
环境变量将 JAR 文件们加载在集群上,这个变量需要包括一个用逗号分隔的 JAR 文件列表。
运行代码
运行代码有几种方式,一是通过 spark-shell 来运行 scala 代码,一是编写 java 代码并打成包以 spark on yarn 方式运行,还有一种是通过 pyspark 来运行 python 代码。
更多内容,参考 Spark安装和使用。
3. 弹性分布式数据集
3.1 并行集合
并行集合是通过调用 SparkContext 的 parallelize
方法,在一个已经存在的 Scala 集合上创建一个 Seq 对象。
parallelize 方法还可以接受一个参数 slices
,表示数据集切分的份数。Spark 将会在集群上为每一份数据起一个任务。典型地,你可以在集群的每个 CPU 上分布 2-4个 slices。一般来说,Spark 会尝试根据集群的状况,来自动设定 slices 的数目,当然,你也可以手动设置。
Scala 示例程序:
scala> val data = Array(1, 2, 3, 4, 5)
data: Array[Int] = Array(1, 2, 3, 4, 5)
scala> var distData = sc.parallelize(data)
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:14
scala> distData.reduce((a, b) => a + b)
res4: Int = 15
Java 示例程序:
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
Integer sum=distData.reduce((a, b) -> a + b);
Python 示例程序:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
distData.reduce(lambda a, b: a + b)
3.2 外部数据源
Spark可以从存储在 HDFS,或者 Hadoop 支持的其它文件系统(包括本地文件,Amazon S3, Hypertable, HBase 等等)上的文件创建分布式数据集。Spark 支持 TextFile
、 SequenceFiles
以及其他任何 Hadoop InputFormat
格式的输入。
TextFile 的 RDD 可以通过下面方式创建,该方法接受一个文件的 URI 地址,该地址可以是本地路径,或者 hdfs://
、 s3n://
等 URL 地址。
// scala 语法
val distFile = sc.textFile("data.txt")
// java 语法
JavaRDD<String> distFile = sc.textFile("data.txt");
// python 语法
distFile = sc.textFile("data.txt")
一些说明:
- 引用的路径必须是绝对路径,并且必须在每一个 worker 节点上保持一致。
- 输入的地址可以是一个目录,也可以是正则匹配表达式,也可以是压缩的文件。
- textFile 方法也可以通过输入一个可选的第二参数,来控制文件的分片数目。默认情况下,Spark 为每一块文件创建一个分片(HDFS 默认的块大小为64MB),但是你也可以通过传入一个更大的值,来指定一个更高的片值,但不能指定一个比块数更小的片值。
除了 TextFile,Spark 还支持其他格式的输入:
-
SparkContext.wholeTextFiles
方法可以读取一个包含多个小文件的目录,并以 filename,content 键值对的方式返回结果。 - 对于 SequenceFiles,可以使用 SparkContext 的
sequenceFile[K, V]
` 方法创建。像 IntWritable 和 Text 一样,它们必须是 Hadoop 的 Writable 接口的子类。另外,对于几种通用 Writable 类型,Spark 允许你指定原生类型来替代。例如:sequencFile[Int, String] 将会自动读取 IntWritable 和 Texts。 - 对于其他类型的 Hadoop 输入格式,你可以使用
SparkContext.hadoopRDD
方法,它可以接收任意类型的 JobConf 和输入格式类,键类型和值类型。按照像 Hadoop 作业一样的方法设置输入源就可以了。 -
RDD.saveAsObjectFile
和SparkContext.objectFile
提供了以 Java 序列化的简单方式来保存 RDD。虽然这种方式没有 Avro 高效,但也是一种简单的方式来保存任意的 RDD。
3.3 RDD 操作
RDD支持两种操作:
-
转换
:从现有的数据集创建一个新的数据集; -
动作
:在数据集上运行计算后,返回一个值给驱动程序。
例如,map 是一种转换,它将数据集每一个元素都传递给函数,并返回一个新的分布数据集表示结果,而 reduce 是一种动作,通过一些函数将所有的元素叠加起来,并将最终结果返回给运行程序。
Spark 中的 所有转换都是惰性的
,也就是说,他们并不会直接计算结果。相反的,它们只是记住应用到基础数据集上的这些转换动作。只有当发生一个要求返回结果给运行程序的动作时,这些转换才会真正运行。
默认情况下,每一个转换过的 RDD 都会在你运行一个动作时被重新计算。不过,你也可以使用 persist
或者 cache
方法,持久化一个 RDD 在内存中。在这种情况下,Spark 将会在集群中,保存相关元素,下次你查询这个 RDD 时,它将能更快速访问。除了持久化到内存,Spark 也支持在磁盘上持久化数据集,或在节点之间复制数据集。
Scala 示例:
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
Java 示例:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
Python 示例:
lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
代码说明:
- 第一行定义了一个基础 RDD,但并没有开始载入内存,仅仅将 lines 指向了这个file
- 第二行也仅仅定义了 linelengths 是作为 map 的结果,但也没有开始运行 map 这个过程
- 直到第三句话才开始运行,各个 worker 节点开始运行自己的 map、reduce 过程
你也可以调用 lineLengths.persist()
来持久化 RDD。
除了使用 lambda 表达式,也可以通过函数来运行转换或者动作,使用函数需要注意局部变量的作用域问题。
例如下面的 Python 代码中的 field 变量:
class MyClass(object):
def __init__(self):
self.field = "Hello"
def doStuff(self, rdd):
field = self.field
return rdd.map(lambda s: field + x)
如果使用 Java 语言,则需要用到匿名内部类:
class GetLength implements Function<String, Integer> {
public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
public Integer call(Integer a, Integer b) { return a + b; }
}
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());
Spark 也支持键值对的操作,这在分组和聚合操作时候用得到。定义一个键值对对象时,需要自定义该对象的 equals() 和 hashCode() 方法。
在 Scala 中有一个 Tuple2 对象表示键值对,这是一个内置的对象,通过 (a,b)
就可以创建一个 Tuple2 对象。在你的程序中,通过导入 org.apache.spark.SparkContext._
就可以对 Tuple2 进行操作。对键值对的操作方法,可以查看 PairRDDFunctions
下面是一个用 scala 统计单词出现次数的例子:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
接下来,你还可以执行 counts.sortByKey()
、 counts.collect()
等操作。
如果用 Java 统计,则代码如下:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
用 Python 统计,代码如下:
lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
测试
现在来结合上面的例子实现一个完整的例子。下面,我们来 分析 Nginx 日志中状态码出现次数,并且将结果按照状态码从小到大排序。
先将测试数据上传到 hdfs:
$ hadoop fs -put access.log
然后,编写一个 python 文件,保存为 SimpleApp.py:
from pyspark import SparkContext
logFile = "access.log"
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()
counts = logData.map(lambda line: line.split()[8]).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey(lambda x: x)
# This is just a demo on how to bring all the sorted data back to a single node.
# In reality, we wouldn't want to collect all the data to the driver node.
output = counts.collect()
for (word, count) in output:
print "%s: %i" % (word, count)
counts.saveAsTextFile("spark_results")
sc.stop()
接下来,运行下面代码:
$ spark-submit --master local[4] SimpleApp.py
运行成功之后,你会在终端看到以下输出:
200: 6827
206: 120
301: 7
304: 10
403: 38
404: 125
416: 1
并且,在hdfs 上 /user/spark/spark_results/part-00000 内容如下:
(u'200', 6827)
(u'206', 120)
(u'301', 7)
(u'304', 10)
(u'403', 38)
(u'404', 125)
(u'416', 1)
其实,这个例子和官方提供的例子很相像,具体请看 wordcount.py。
常见的转换
转换 | 含义 |
---|---|
map(func) |
返回一个新分布式数据集,由每一个输入元素经过func函数转换后组成 |
filter(func) |
返回一个新数据集,由经过func函数计算后返回值为 true 的输入元素组成 |
flatMap(func) |
类似于 map,但是每一个输入元素可以被映射为0或多个输出元素,因此 func 应该返回一个序列 |
mapPartitions(func) |
类似于 map,但独立地在 RDD 的每一个分块上运行,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是 Iterator[T] ⇒ Iterator[U] |
mapPartitionsWithSplit(func) |
类似于 mapPartitions, 但 func 带有一个整数参数表示分块的索引值。因此在类型为 T的RDD上运行时,func 的函数类型必须是 (Int, Iterator[T]) ⇒ Iterator[U] |
sample(withReplacement,fraction, seed) |
根据 fraction 指定的比例,对数据进行采样,可以选择是否用随机数进行替换,seed 用于指定随机数生成器种子 |
union(otherDataset) |
返回一个新的数据集,新数据集是由源数据集和参数数据集联合而成 |
distinct([numTasks])) |
返回一个包含源数据集中所有不重复元素的新数据集 |
groupByKey([numTasks]) |
在一个键值对的数据集上调用,返回一个 (K,Seq[V]) 对的数据集 。注意:默认情况下,只有8个并行任务来做操作,但是你可以传入一个可选的 numTasks 参数来改变它 |
reduceByKey(func, [numTasks]) |
在一个键值对的数据集上调用时,返回一个键值对的数据集,使用指定的 reduce 函数,将相同 key 的值聚合到一起。类似 groupByKey,reduce 任务个数是可以通过第二个可选参数来配置的 |
sortByKey([ascending], [numTasks]) |
在一个键值对的数据集上调用,K 必须实现 Ordered 接口,返回一个按照 Key 进行排序的键值对数据集。升序或降序由 ascending 布尔参数决定 |
join(otherDataset, [numTasks]) |
在类型为(K,V)和(K,W) 类型的数据集上调用时,返回一个相同key对应的所有元素对在一起的 (K, (V, W)) 数据集 |
cogroup(otherDataset, [numTasks]) |
在类型为(K,V)和(K,W) 的数据集上调用,返回一个 (K, Seq[V], Seq[W]) 元组的数据集。这个操作也可以称之为 groupwith |
cartesian(otherDataset) |
笛卡尔积,在类型为 T 和 U 类型的数据集上调用时,返回一个 (T, U) 对数据集(两两的元素对) |
pipe(command, [envVars]) |
对 RDD 进行管道操作 |
coalesce(numPartitions) |
减少 RDD 的分区数到指定值。在过滤大量数据之后,可以执行此操作 |
repartition(numPartitions) |
重新给 RDD 分区 |
repartitionAndSortWithinPartitions(partitioner) |
重新给 RDD 分区,并且每个分区内以记录的 key 排序 |
常用的动作
常用的动作列表
动作 | 含义 |
---|---|
reduce(func) |
通过函数 func 聚集数据集中的所有元素。这个功能必须可交换且可关联的,从而可以正确的被并行执行。 |
collect() |
在驱动程序中,以数组的形式,返回数据集的所有元素。这通常会在使用 filter 或者其它操作并返回一个足够小的数据子集后再使用会比较有用。 |
count() |
返回数据集的元素的个数。 |
first() |
返回数据集的第一个元素,类似于 take(1) |
take(n) |
返回一个由数据集的前 n 个元素组成的数组。注意,这个操作目前并非并行执行,而是由驱动程序计算所有的元素 |
takeSample(withReplacement,num, seed) |
返回一个数组,在数据集中随机采样 num 个元素组成,可以选择是否用随机数替换不足的部分,seed 用于指定的随机数生成器种子 |
takeOrdered(n, [ordering]) |
返回自然顺序或者自定义顺序的前 n 个元素 |
saveAsTextFile(path) |
将数据集的元素,以 textfile 的形式,保存到本地文件系统,HDFS或者任何其它 hadoop 支持的文件系统。对于每个元素,Spark 将会调用 toString 方法,将它转换为文件中的文本行 |
saveAsSequenceFile(path) (Java and Scala) |
将数据集的元素,以 Hadoop sequencefile 的格式保存到指定的目录下 |
saveAsObjectFile(path) (Java and Scala) |
将数据集的元素,以 Java 序列化的方式保存到指定的目录下 |
countByKey() |
对(K,V)类型的 RDD 有效,返回一个 (K,Int) 对的 Map,表示每一个key对应的元素个数 |
foreach(func) |
在数据集的每一个元素上,运行函数 func 进行更新。这通常用于边缘效果,例如更新一个累加器,或者和外部存储系统进行交互,例如HBase |
3.4 RDD持久化
Spark 最重要的一个功能,就是在不同操作间,持久化(或缓存)一个数据集在内存中,这将使得后续的动作变得更加迅速。缓存是用 Spark 构建迭代算法的关键。 使用以下两种方法可以标记要缓存的 RDD:
lineLengths.persist()
lineLengths.cache()
取消缓存则用:
lineLengths.unpersist()
每一个RDD都可以用不同的保存级别进行保存,通过将一个 org.apache.spark.storage.StorageLevel
对象传递给 persist(self, storageLevel)
可以控制 RDD 持久化到磁盘、内存或者是跨节点复制等等。 cache()
方法是使用默认存储级别的快捷方法,也就是 StorageLevel.MEMORY_ONLY
。 完整的可选存储级别如下:
存储级别 | 意义 |
---|---|
MEMORY_ONLY |
默认的级别, 将 RDD 作为反序列化的的对象存储在 JVM 中。如果不能被内存装下,一些分区将不会被缓存,并且在需要的时候被重新计算 |
MEMORY_AND_DISK |
将 RDD 作为反序列化的的对象存储在 JVM 中。如果不能被与内存装下,超出的分区将被保存在硬盘上,并且在需要时被读取 |
MEMORY_ONLY_SER |
将 RDD 作为序列化的的对象进行存储(每一分区占用一个字节数组)。通常来说,这比将对象反序列化的空间利用率更高,尤其当使用fast serializer,但在读取时会比较占用CPU |
MEMORY_AND_DISK_SER |
与 MEMORY_ONLY_SER 相似,但是把超出内存的分区将存储在硬盘上而不是在每次需要的时候重新计算 |
DISK_ONLY |
只将 RDD 分区存储在硬盘上 |
MEMORY_ONLY_2 、 MEMORY_AND_DISK_2 等 |
与上述的存储级别一样,但是将每一个分区都复制到两个集群结点上 |
OFF_HEAP |
开发中 |
Spark 的不同存储级别,旨在满足内存使用和 CPU 效率权衡上的不同需求。我们建议通过以下的步骤来进行选择:
- 如果你的 RDD 可以很好的与默认的存储级别契合,就不需要做任何修改了。这已经是 CPU 使用效率最高的选项,它使得 RDD的操作尽可能的快。
- 如果不行,试着使用
MEMORY_ONLY_SER
并且选择一个快速序列化的库使得对象在有比较高的空间使用率的情况下,依然可以较快被访问。 - 尽可能不要存储到硬盘上,除非计算数据集的函数,计算量特别大,或者它们过滤了大量的数据。否则,重新计算一个分区的速度,和与从硬盘中读取基本差不多快。
- 如果你想有快速故障恢复能力,使用复制存储级别。例如:用 Spark 来响应web应用的请求。所有的存储级别都有通过重新计算丢失数据恢复错误的容错机制,但是复制存储级别可以让你在 RDD 上持续的运行任务,而不需要等待丢失的分区被重新计算。
- 如果你想要定义你自己的存储级别,比如复制因子为3而不是2,可以使用
StorageLevel
单例对象的apply()
方法。