scala 开发spark程序

标签: scala 开发 spark | 发表时间:2014-08-24 10:56 | 作者:windyrails
出处:http://www.iteye.com

Spark内核是由Scala语言开发的,因此使用Scala语言开发Spark应用程序是自然而然的事情。如果你对Scala语言还不太熟悉,可以阅读网络教程 A Scala Tutorial for Java Programmers或者相关 Scala书籍进行学习。

 

本文将介绍3个Scala Spark编程实例,分别是WordCount、TopK和SparkJoin,分别代表了Spark的三种典型应用。

1. WordCount编程实例

WordCount是一个最简单的分布式应用实例,主要功能是统计输入目录中所有单词出现的总次数,编写步骤如下:

步骤1:创建一个SparkContext对象,该对象有四个参数:Spark master位置、应用程序名称,Spark安装目录和jar存放位置,对于Spark On YARN而言,最重要的是前两个参数,第一个参数指定为“yarn-standalone”,第二个参数是自定义的字符串,举例如下:

1
2
val sc = new SparkContext(args( 0 ), "WordCount" ,
     System.getenv( "SPARK_HOME" ), Seq(System.getenv( "SPARK_TEST_JAR" )))

步骤2:读取输入数据。我们要从HDFS上读取文本数据,可以使用SparkContext中的textFile函数将输入文件转换为一个RDD,该函数采用的是Hadoop中的TextInputFormat解析输入数据,举例如下:

1
val textFile = sc.textFile(args( 1 ))

当然,Spark允许你采用任何Hadoop InputFormat,比如二进制输入格式SequenceFileInputFormat,此时你可以使用SparkContext中的hadoopRDD函数,举例如下:

1
2
val inputFormatClass = classOf[SequenceFileInputFormat[Text,Text]]
var hadoopRdd = sc.hadoopRDD(conf, inputFormatClass, classOf[Text], classOf[Text])

或者直接创建一个HadoopRDD对象:

1
2
var hadoopRdd = new HadoopRDD(sc, conf,
      classOf[SequenceFileInputFormat[Text,Text, classOf[Text], classOf[Text])

步骤3:通过RDD转换算子操作和转换RDD,对于WordCount而言,首先需要从输入数据中每行字符串中解析出单词,然后将相同单词放到一个桶中,最后统计每个桶中每个单词出现的频率,举例如下:

1
2
3
     val result = hadoopRdd.flatMap{
         case (key, value)  = > value.toString().split( "\\s+" );
}.map(word = > (word, 1 )). reduceByKey ( _ + _ )

其中,flatMap函数可以将一条记录转换成多条记录(一对多关系),map函数将一条记录转换为另一条记录(一对一关系),reduceByKey函数将key相同的数据划分到一个桶中,并以key为单位分组进行计算,这些函数的具体含义可参考: Spark Transformation

步骤4:将产生的RDD数据集保存到HDFS上。可以使用SparkContext中的saveAsTextFile哈数将数据集保存到HDFS目录下,默认采用Hadoop提供的TextOutputFormat,每条记录以“(key,value)”的形式打印输出,你也可以采用saveAsSequenceFile函数将数据保存为SequenceFile格式等,举例如下:

1
result.saveAsSequenceFile(args( 2 ))

当然,一般我们写Spark程序时,需要包含以下两个头文件:

1
2
import org.apache.spark. _
import SparkContext. _

WordCount完整程序已在“ Apache Spark学习:利用Eclipse构建Spark集成开发环境”一文中进行了介绍,在次不赘述。

需要注意的是,指定输入输出文件时,需要指定hdfs的URI,比如输入目录是hdfs://hadoop-test/tmp/input,输出目录是hdfs://hadoop-test/tmp/output,其中,“hdfs://hadoop-test”是由Hadoop配置文件core-site.xml中参数fs.default.name指定的,具体替换成你的配置即可。

2. TopK编程实例

TopK程序的任务是对一堆文本进行词频统计,并返回出现频率最高的K个词。如果采用MapReduce实现,则需要编写两个作业:WordCount和TopK,而使用Spark则只需一个作业,其中WordCount部分已由前面实现了,接下来顺着前面的实现,找到Top K个词。注意,本文的实现并不是最优的,有很大改进空间。

步骤1:首先需要对所有词按照词频排序,如下:

1
2
3
val sorted = result.map {
   case (key, value) = > (value, key); //exchange key and value
}.sortByKey( true , 1 )

步骤2:返回前K个:

1
val topK = sorted.top(args( 3 ).toInt)

步骤3:将K各词打印出来:

1
topK.foreach(println)

注意,对于应用程序标准输出的内容,YARN将保存到Container的stdout日志中。在YARN中,每个Container存在三个日志文件,分别是stdout、stderr和syslog,前两个保存的是标准输出产生的内容,第三个保存的是log4j打印的日志,通常只有第三个日志中有内容。

本程序完整代码、编译好的jar包和运行脚本可以从 这里下载。下载之后,按照“ Apache Spark学习:利用Eclipse构建Spark集成开发环境”一文操作流程运行即可。

3. SparkJoin编程实例

在推荐领域有一个著名的开放测试集是movielens给的,下载链接是: http://grouplens.org/datasets/movielens/,该测试集包含三个文件,分别是ratings.dat、sers.dat、movies.dat,具体介绍可阅读: README.txt,本节给出的SparkJoin实例则通过连接ratings.dat和movies.dat两个文件得到平均得分超过4.0的电影列表,采用的数据集是: ml-1m。程序代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import org.apache.spark. _
import SparkContext. _
object SparkJoin {
   def main(args : Array[String]) {
     if (args.length ! = 4 ){
       println( "usage is org.test.WordCount <master> <rating> <movie> <output>" )
       return
     }
     val sc = new SparkContext(args( 0 ), "WordCount" ,
     System.getenv( "SPARK_HOME" ), Seq(System.getenv( "SPARK_TEST_JAR" )))
 
     // Read rating from HDFS file
     val textFile = sc.textFile(args( 1 ))
 
     //extract (movieid, rating)
     val rating = textFile.map(line = > {
         val fileds = line.split( "::" )
         (fileds( 1 ).toInt, fileds( 2 ).toDouble)
        })
 
     val movieScores = rating
        .groupByKey()
        .map(data = > {
          val avg = data. _ 2 .sum / data. _ 2 .size
          (data. _ 1 , avg)
        })
 
      // Read movie from HDFS file
      val movies = sc.textFile(args( 2 ))
      val movieskey = movies.map(line = > {
        val fileds = line.split( "::" )
         (fileds( 0 ).toInt, fileds( 1 ))
      }).keyBy(tup = > tup. _ 1 )
 
      // by join, we get <movie, averageRating, movieName>
      val result = movieScores
        .keyBy(tup = > tup. _ 1 )
        .join(movieskey)
        .filter(f = > f. _ 2 . _ 1 . _ 2 > 4.0 )
        .map(f = > (f. _ 1 , f. _ 2 . _ 1 . _ 2 , f. _ 2 . _ 2 . _ 2 ))
 
     result.saveAsTextFile(args( 3 ))
   }
}

你可以从 这里下载代码、编译好的jar包和运行脚本。

这个程序直接使用Spark编写有些麻烦,可以直接在 Shark上编写HQL实现,Shark是基于Spark的类似Hive的交互式查询引擎,具体可参考: Shark

4. 总结

Spark 程序设计对Scala语言的要求不高,正如Hadoop程序设计对Java语言要求不高一样,只要掌握了最基本的语法就能编写程序,且常见的语法和表达方式是很少的。通常,刚开始仿照官方实例编写程序,包括 Scala、Java和Python三种语言实例。



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [scala 开发 spark] 推荐:

scala 开发spark程序

- - 研发管理 - ITeye博客
Spark内核是由Scala语言开发的,因此使用Scala语言开发Spark应用程序是自然而然的事情. 如果你对Scala语言还不太熟悉,可以阅读网络教程 A Scala Tutorial for Java Programmers或者相关 Scala书籍进行学习. 本文将介绍3个Scala Spark编程实例,分别是WordCount、TopK和SparkJoin,分别代表了Spark的三种典型应用.

Heroku已支持Scala

- gnawux - InfoQ中文站
今日JavaOne大会上,被SalesForce.com近期收购的平台即服务(PaaS)提供商,Heroku,宣布增加了对Scala的支持. Heroku目前正同Typesafe公司合作,共同致力于在Heroku平台中增加对Scala支持的工作. Typesafe,“Scala语言的母公司”,最初由Scala创始人Martin Odersky与他人联合创办.

Scala设计模式

- - ITeye博客
       我的话: 在国外网站上看到一篇文章,里面详细描述了很多设计模式,并且用Java及Scala两种语言描述,清晰的让我们看到各种常规的设计模式,在Scala中是如何在语言特性层面直接支持的. 基于文章很nice,我利用今天的空闲时间将其翻译,希望大家能一起学习,讨论. 翻译比较倡促,也就两小时左右,有何不当,请在下面留言指出.

Spark随谈——开发指南(译)

- - 淘宝网综合业务平台团队博客
本文翻译自官方博客,略有添加: https://github.com/mesos/spark/wiki/Spark-Programming-Guide,谢谢师允tx的校正. 希望能够给希望尝试Spark的朋友,带来一些帮助. 从高的层面来看,其实每一个Spark的应用,都是一个Driver类,通过运行用户定义的main函数,在集群上执行各种并发操作和计算.

Spark 团队推出19美元的物联网开发套件Spark Photon

- - TechCrunch 中国
原先的物联网开发套件 Spark Core 在 Kickstarter上红极一时,但它有一个致命的缺点:你必须是一名彻头彻尾的电子极客才能弄明白怎么玩这东西. 不过现在,有了 Spark Photon,门槛就没那么苛刻了. Photon 和 原先的Core 十分类似,只不过速度稍微更快一些,而且更加紧凑.

Yammer从Scala转向Java

- - InfoQ中文站
经历了一年之久的尝试,Yammer将要从Scala迁回至Java,因为他们发现简洁的语言所带来的好处根本无法抵消培训新员工以及调试性能问题所产生的代价. 文中所提到的邮件也表明通过避免某些模式可以实现性能上的一些改进.

将 Spark Streaming + Kafka direct 的 offset 存入Zookeeper并重用-Spark-about云开发-活到老 学到老

- -
使用Direct API时为什么需要见offset保存到Zookeeper中. 如何将offset存入到Zookeeper中. 如何解决Zookeeper中offset过期问题. 实现将offset存入Zookeeper. 在 Spark Streaming 中消费 Kafka 数据的时候,有两种方式分别是 1)基于 Receiver-based 的 createStream 方法和 2)Direct Approach (No Receivers) 方式的 createDirectStream 方法,详细的可以参考 .

Scala支持与Java的隐式转换

- - 简单文本
Neal Ford在几年前提出的“Poly Programming”思想,已经逐渐成为主流. 这种思想并非是为了炫耀多语言的技能,然后选择“高大上”. 真正的目的在于更好地利用各种语言处理不同场景、不同问题的优势. 由于都运行在JVM上,Java与Scala之间基本能做到无缝的集成,区别主要在于各自的API各有不同.

快速了解Scala技术栈

- - 逸言
我无可救药地成为了Scala的超级粉丝. 在我使用Scala开发项目以及编写框架后,它就仿佛凝聚成为一个巨大的黑洞,吸引力使我不得不飞向它,以至于开始背离Java. 固然Java 8为Java阵营增添了一丝亮色,却是望眼欲穿,千呼万唤始出来. 而Scala程序员,却早就在享受lambda、高阶函数、trait、隐式转换等带来的福利了.

Spark概览

- - 简单文本
Spark具有先进的DAG执行引擎,支持cyclic data flow和内存计算. 因此,它的运行速度,在内存中是Hadoop MapReduce的100倍,在磁盘中是10倍. 这样的性能指标,真的让人心动啊. Spark的API更为简单,提供了80个High Level的操作,可以很好地支持并行应用.