如何在CDH5上运行Spark应用

标签: cdh5 spark 应用 | 发表时间:2015-02-04 00:00 | 作者:
出处:http://blog.javachen.com/rss.xml

这篇文章参考 How-to: Run a Simple Apache Spark App in CDH 5 编写而成,没有完全参照原文翻译,而是重新进行了整理,例如:spark 版本改为 1.2.0-cdh5.3.0,添加了 Python 版的程序。

本文主要记录在 CDH5 集群环境上如何创建一个 Scala 的 maven 工程并且编写、编译和运行一个简单的 Spark 程序。原文中的代码在 https://github.com/sryza/simplesparkapp,我 fork 之后做了一些修改,参见 https://github.com/javachen/simplesparkapp

创建 maven 工程

使用下面命令创建一个普通的 maven 工程:

   $ mvn archetype:generate -DgroupId=com.cloudera.sparkwordcount -DartifactId=sparkwordcount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

将 sparkwordcount 目录重命名为simplesparkapp,然后,在 simplesparkapp 目录下添加 scala 源文件目录:

   $ mkdir -p sparkwordcount/src/main/scala/com/cloudera/sparkwordcount

修改 pom.xml 添加 scala 和 spark 依赖:

     <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.10.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>1.2.0-cdh5.3.0</version>
    </dependency>
  </dependencies>

添加编译 scala 的插件:

    <plugin>
  <groupId>org.scala-tools</groupId>
      <artifactId>maven-scala-plugin</artifactId>
      <executions>
        <execution>
          <goals>
            <goal>compile</goal>
            <goal>testCompile</goal>
          </goals>
        </execution>
      </executions>
</plugin>

添加 scala 编译插件需要的仓库:

   <pluginRepositories>
  <pluginRepository>
    <id>scala-tools.org</id>
    <name>Scala-tools Maven2 Repository</name>
    <url>http://scala-tools.org/repo-releases</url>
  </pluginRepository>
</pluginRepositories>

另外,添加 cdh hadoop 的仓库:

     <repositories>
    <repository>
      <id>scala-tools.org</id>
      <name>Scala-tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </repository>
    <repository>
      <id>maven-hadoop</id>
      <name>Hadoop Releases</name>
      <url>https://repository.cloudera.com/content/repositories/releases/</url>
    </repository>
    <repository>
      <id>cloudera-repos</id>
      <name>Cloudera Repos</name>
      <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
  </repositories>

最后,完整的 pom.xml 文件见: https://github.com/javachen/simplesparkapp/blob/master/pom.xml

运行下面命令检查工程是否能够成功编译:

   mvn package

编写示例代码

WordCount 为例,该程序需要完成以下逻辑:

  • 读一个输入文件
  • 统计每个单词出现次数
  • 过滤少于一定次数的单词
  • 对剩下的单词统计每个字母出现次数

在 MapReduce 中,上面的逻辑需要两个 MapReduce 任务,而在 Spark 中,只需要一个简单的任务,并且代码量会少 90%。

编写 Scala 程序 如下:

   import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SparkWordCount {
  def main(args: Array[String]) {
    val sc = new SparkContext(new SparkConf().setAppName("Spark Count"))
    val threshold = args(1).toInt

    // split each document into words
    val tokenized = sc.textFile(args(0)).flatMap(_.split(" "))

    // count the occurrence of each word
    val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)

    // filter out words with less than threshold occurrences
    val filtered = wordCounts.filter(_._2 >= threshold)

    // count characters
    val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).reduceByKey(_ + _)

    System.out.println(charCounts.collect().mkString(", "))

    charCounts.saveAsTextFile("world-count-result")
  }
}

Spark 使用懒执行的策略,意味着只有当 动作执行的时候, 转换才会运行。上面例子中的 动作操作是 collectsaveAsTextFile,前者是将数据推送给客户端,后者是将数据保存到 HDFS。

作为对比, Java 版的程序 如下:

   import java.util.ArrayList;
import java.util.Arrays;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.SparkConf;
import scala.Tuple2;

public class JavaWordCount {
  public static void main(String[] args) {
    JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count"));
    final int threshold = Integer.parseInt(args[1]);

    // split each document into words
    JavaRDD tokenized = sc.textFile(args[0]).flatMap(
      new FlatMapFunction() {
        public Iterable call(String s) {
          return Arrays.asList(s.split(" "));
        }
      }
    );

    // count the occurrence of each word
    JavaPairRDD counts = tokenized.mapToPair(
      new PairFunction() {
        public Tuple2 call(String s) {
          return new Tuple2(s, 1);
        }
      }
    ).reduceByKey(
      new Function2() {
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      }
    );

另外, Python 版的程序 如下:

   import sys

from pyspark import SparkContext

file="inputfile.txt"
count=2

if __name__ == "__main__":
    sc = SparkContext(appName="PythonWordCount")
    lines = sc.textFile(file, 1)
    counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1))  \
                  .reduceByKey(lambda a, b: a + b)  \
                  .filter(lambda (a, b) : b >= count)  \
                  .flatMap(lambda (a, b): list(a))  \
                  .map(lambda x: (x, 1))  \
                  .reduceByKey(lambda a, b: a + b)

    print ",".join(str(t) for t in counts.collect())
    sc.stop()

编译

运行下面命令生成 jar:

   $ mvn package

运行成功之后,会在 target 目录生成 sparkwordcount-0.0.1-SNAPSHOT.jar 文件。

运行

因为项目依赖的 spark 版本是 1.2.0-cdh5.3.0,所以下面的命令只能在 CDH 5.3 集群上运行。

首先,将测试文件 inputfile.txt 上传到 HDFS 上;

   $ wget https://github.com/javachen/simplesparkapp/blob/master/data/inputfile.txt
$ hadoop fs -put inputfile.txt

其次,将 sparkwordcount-0.0.1-SNAPSHOT.jar 上传到集群中的一个节点;然后,使用 spark-submit 脚本运行 Scala 版的程序:

   $ spark-submit --class com.cloudera.sparkwordcount.SparkWordCount --master local sparkwordcount-0.0.1-SNAPSHOT.jar inputfile.txt 2

或者,运行 Java 版本的程序:

   $ spark-submit --class com.cloudera.sparkwordcount.JavaWordCount --master local sparkwordcount-0.0.1-SNAPSHOT.jar inputfile.txt 2

对于 Python 版的程序,运行脚本为:

   $ spark-submit  --master local PythonWordCount.py

如果,你的集群部署的是 standalone 模式,则你可以替换 master 参数的值为 spark://<master host>:<master port>,也可以以 Yarn 的模式运行。

最后的 Python 版的程序运行输出结果如下:

   (u'a', 4),(u'c', 1),(u'b', 1),(u'e', 6),(u'f', 1),(u'i', 1),(u'h', 1),(u'l', 1),(u'o', 2),(u'n', 4),(u'p', 2),(u'r', 2),(u'u', 1),(u't', 2),(u'v', 1)

完整的代码在: https://github.com/javachen/simplesparkapp

相关 [cdh5 spark 应用] 推荐:

如何在CDH5上运行Spark应用

- - JavaChen's Blog
How-to: Run a Simple Apache Spark App in CDH 5 编写而成,没有完全参照原文翻译,而是重新进行了整理,例如:spark 版本改为. 1.2.0-cdh5.3.0,添加了 Python 版的程序. 本文主要记录在 CDH5 集群环境上如何创建一个 Scala 的 maven 工程并且编写、编译和运行一个简单的 Spark 程序.

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用 - shishanyuan - 博客园

- -
【注】该系列文章以及使用到安装包/测试数据 可以在《. 倾情大奉送--Spark入门实战系列》获取. l 主机操作系统:Windows 64位,双核4线程,主频2.2G,10G内存. l 虚拟软件:VMware® Workstation 9.0.0 build-812388. l 虚拟机操作系统:CentOS 64位,单核.

浅谈 Spark 应用程序的性能调优

- - SegmentFault 最新的文章
Spark是基于内存的分布式计算引擎,以处理的高效和稳定著称. 然而在实际的应用开发过程中,开发者还是会遇到种种问题,其中一大类就是和性能相关. 在本文中,笔者将结合自身实践,谈谈如何尽可能地提高应用程序性能. 分布式计算引擎在调优方面有四个主要关注方向,分别是CPU、内存、网络开销和I/O,其具体调优目标如下:.

Spark技术在京东智能供应链预测的应用

- - IT瘾-bigdata
前段时间京东公开了面向第二个十二年的战略规划,表示京东将全面走向技术化,大力发展人工智能和机器人自动化技术,将过去传统方式构筑的优势全面升级. 京东Y事业部顺势成立,该事业部将以服务泛零售为核心,着重智能供应能力的打造,核心使命是利用人工智能技术来驱动零售革新. 1.1   京东的供应链. 京东一直致力于通过互联网电商建立需求侧与供给侧的精准、高效匹配,供应链管理是零售联调中的核心能力,是零售平台能力的关键体现,也是供应商与京东紧密合作的纽带,更是未来京东智能化商业体布局中的核心环节.

离线安装Cloudera Manager 5和CDH5(最新版5.1.3) 完全教程 - StanZhai

- - 博客园_首页
关于CDH和Cloudera Manager. CDH (Cloudera's Distribution, including Apache Hadoop),是Hadoop众多分支中的一种,由Cloudera维护,基于稳定版本的Apache Hadoop构建,并集成了很多补丁,可直接用于生产环境. Cloudera Manager则是为了便于在集群中进行Hadoop等大数据处理相关的服务安装和监控管理的组件,对集群中主机、Hadoop、Hive、Spark等服务的安装配置管理做了极大简化.

Spark概览

- - 简单文本
Spark具有先进的DAG执行引擎,支持cyclic data flow和内存计算. 因此,它的运行速度,在内存中是Hadoop MapReduce的100倍,在磁盘中是10倍. 这样的性能指标,真的让人心动啊. Spark的API更为简单,提供了80个High Level的操作,可以很好地支持并行应用.

Spark与Mapreduce?

- - 崔永键的博客
我本人是类似Hive平台的系统工程师,我对MapReduce的熟悉程度是一般,它是我的底层框架. 我隔壁组在实验Spark,想将一部分计算迁移到Spark上. 年初的时候,看Spark的评价,几乎一致表示,Spark是小数据集上处理复杂迭代的交互系统,并不擅长大数据集,也没有稳定性. 但是最近的风评已经变化,尤其是14年10月他们完成了Peta sort的实验,这标志着Spark越来越接近替代Hadoop MapReduce了.

Spark迷思

- - ITeye博客
目前在媒体上有很大的关于Apache Spark框架的声音,渐渐的它成为了大数据领域的下一个大的东西. 证明这件事的最简单的方式就是看google的趋势图:. 上图展示的过去两年Hadoop和Spark的趋势. Spark在终端用户之间变得越来越受欢迎,而且这些用户经常在网上找Spark相关资料. 这给了Spark起了很大的宣传作用;同时围绕着它的也有误区和思维错误,而且很多人还把这些误区作为银弹,认为它可以解决他们的问题并提供比Hadoop好100倍的性能.

Spark 优化

- - CSDN博客推荐文章
提到Spark与Hadoop的区别,基本最常说的就是Spark采用基于内存的计算方式,尽管这种方式对数据处理的效率很高,但也会往往引发各种各样的问题,Spark中常见的OOM等等. 效率高的特点,注定了Spark对性能的严苛要求,那Spark不同程序的性能会碰到不同的资源瓶颈,比如:CPU,带宽、内存.

Spark&Spark性能调优实战

- - CSDN博客互联网推荐文章
       Spark特别适用于多次操作特定的数据,分mem-only和mem & disk. 其中mem-only:效率高,但占用大量的内存,成本很高;mem & disk:内存用完后,会自动向磁盘迁移,解决了内存不足的问题,却带来了数据的置换的消费. Spark常见的调优工具有nman、Jmeter和Jprofile,以下是Spark调优的一个实例分析:.