Spark如何读取一些大数据集到本地机器上

标签: spark 大数据 机器 | 发表时间:2018-01-04 21:07 | 作者:
出处:http://www.iteye.com



最近在使用spark处理分析一些公司的埋点数据,埋点数据是json格式,现在要解析json取特定字段的数据,做一些统计分析,所以有时候需要把数据从集群上拉到driver节点做处理,这里面经常出现的一个问题就是,拉取结果集过大,而驱动节点内存不足,经常导致OOM,也就是我们常见的异常:
````
 java.lang.OutOfMemoryError: Java heap space 
````


这种写法的代码一般如下:

````
//加载HDFS数据
 val rdd=sc.textFile("/data/logs/*")
 
 //在驱动程序获取结果集
 val datas=ArrayBuffer[String]()
 
 //把所有数据,拉倒驱动端操作
 rdd.collect.foreach(line=>{
 
   datas += line.split('#')(1)  //得到某个字段
 
 })
 
 sc.stop()
````


上面的这种写法,基本原理就是一次性把所有分区的数据,全部读取到driver节点上,然后开始做处理,所以数据量大的时候,经常会出现内存溢出情况。


(问题一)如何避免这种情况?

分而治之,每次只拉取一个分区的数据到驱动节点上,处理完之后,再处理下一个分数据的数据。

(问题二)如果单个分区的数据已经大到内存装不下怎么办?


给数据集增加更多的分区,让大分区变成多个小分区。


(问题三)如果结果集数据大于内存的大小怎么办?

要么增加驱动节点的内存,要么给每个分区的数据都持久化本地文件上,不再内存中维护



下面来看下关键问题,如何修改spark的rdd分区数量?


我们知道在spark里面RDD是数据源的抽象模型,RDD里面实际上是把一份大数据源切分成了多个分区数据,然后来并行处理这份大数据集。


默认情况下如果Spark从HDFS上加载数据,默认分区个数是按照HDFS的block size来切分的,当然我们在加载的时候可以指定的分区个数。
````
textFile(path,partitionNums)//第二个参数可以指定分区个数
````


如果在加载时不指定分区个数,spark里面还提供了两个函数来进行重分区:

````
(1)def coalesce(numPartitions: Int, shuffle: Boolean = false):RDD[T]
(2)def repartition(numPartitions: Int):RDD[T]
````


接着我们来看下coalesce函数和repartition函数的区别:


通过查看源码得知repartition函数内部实际上是调用了coalesce函数第二个参数等于true时的封装。所以我们重点来关注下coalesce函数即可:


coalesce的第一个参数是修改后的分区个数

coalesce的第二个参数是控制是否需要shuffle


举一个例子:

当前我们RDD的分区个数是100:

(1)如果要变成10,应该使用
````
rdd.coalesce(10,false)
````
(2)如果要变成300,应该使用
````
rdd.coalesce(300,true)
````
(3)如果要变成1,应该使用
````
rdd.coalesce(1,true)
````


这里解释一下:

分区数从多变少,一般是不需要开启shuffle的,这样性能最高,因为不需要跨网络混洗数据,当然你也可以开启shuffle在特定场景下,如分区数据极其不均衡。但建议一般不要使用。


分区数从少变多,必须开启shuffle,如果不开启那么分区数据是不会改变的,由少变多必须得重新混洗数据才能变多,这里需要注意一点,如果数据量特别少,那么会有一些分区的数据是空。


最后的例子是一种极端场景,如果从多变成1,不开启shuffle,那么可能就个别节点计算压力特别大,集群资源不能充分利用,所以有必要开启shuffle,加速合并计算的流程。



明白了如何改变rdd的分区个数之后,我们就可以文章开头遇到的问题结合起来,拉取大量数据到驱动节点上,如果整体数据集太大,我们就可以增加分区个数,循环拉取,但这里面需要根据具体的场景来设置分区个数,因为分区个数越多,在spark里面生成的task数目就越多,task数目太多也会影响实际的拉取效率,在本案例中,从hdfs上读取的数据默认是144个分区,大约1G多点数据,没有修改分区个数的情况下处理时间大约10分钟,在调整分区个数为10的情况下,拉取时间大约在1-2分钟之间,所以要根据实际情况进行调整。


文章开始前的代码优化后的如下:
````
  def pt_convert( idx:Int,ds:Iterator[String] ,seq:Int):Iterator[String]={
    if(seq==idx) ds else Iterator()
  }
------------------------------
//加载HDFS数据
 val rdd=sc.textFile("/data/logs/*")
 
 //在驱动程序获取结果集
 val datas=ArrayBuffer[String]()
 //重分区并合理优化分区个数
 val new_rdd=rdd.coalesce(10)
 //得到所有的分区信息
 val parts= new_rdd.partitions
 //循环处理每个分区的数据,避免导致OOM
    for(p<-parts){
      //获取分区号
      val idx=p.index
    //第二个参数为true,避免重新shuffle,保留原始分区数据    
    val parRdd=new_rdd.mapPartitionsWithIndex[String](pt_convert(_,_,idx),true)
    //读取结果数据
    val data=parRdd.collect()
    //循环处理数据
    for(line<-data){
    datas += line.split('#')(1)  //得到某个字段
    }
    
      
      }
 

````




最后在看下,spark任务的提交命令:
````
spark-submit  --class  SparkHdfsDataAnalysis 
--conf spark.driver.maxResultSize=1g  
--master yarn  
--executor-cores 5   
--driver-memory 2g  
--executor-memory 3g 
--num-executors 10    
--jars  $jars     spark-analysis.jar  $1 $2
````


这里面主要关注参数:
````
spark.driver.maxResultSize=1g  
driver-memory 2g 
````


单次拉取数据结果集的最大字节数,以及驱动节点的内存,如果在进行大结果集下拉时,需要特别注意下这两个参数的设置。





参考文档:

https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.rdd.RDD
https://spark.apache.org/docs/latest/configuration.html

https://stackoverflow.com/questions/21698443/spark-best-practice-for-retrieving-big-data-from-rdd-to-local-machine

有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。

已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [spark 大数据 机器] 推荐:

Spark如何读取一些大数据集到本地机器上

- - ITeye博客
最近在使用spark处理分析一些公司的埋点数据,埋点数据是json格式,现在要解析json取特定字段的数据,做一些统计分析,所以有时候需要把数据从集群上拉到driver节点做处理,这里面经常出现的一个问题就是,拉取结果集过大,而驱动节点内存不足,经常导致OOM,也就是我们常见的异常:. ```` //加载HDFS数据 val rdd=sc.textFile("/data/logs/*").

Spark机器学习案例 spark-example: spark mllib example

- -
#这是一个Spark MLlib实例 . ##1 K-means基础实例 . 命名为kmeans_data.txt,且上传到hdfs的/spark/mllib/data/路径中. 在Intellij中,点击file->选择project structure->选择Artifact->添加jar->把乱七八糟的依赖移除->勾选Build on make.

虚拟座谈会:大数据一栈式方案Spark

- - CSDN博客架构设计推荐文章
Spark正在占据越来越多的大数据新闻的重要位置,除了性能优异,Spark到底具备了那些特性,让学术界和工业界对其充满了兴趣. 同时,Spark还处在快速发展的阶段,开发者和用户不得不解决不稳定和bug,Scala语言也有较高的学习门槛,这些也会成为Spark普及的障碍. 当然,尽管Spark提供了一栈式的大数据方案,但并不意味着他适合任何场景.

颠覆大数据分析之Spark弹性分布式数据集

- - 并发编程网 - ifeve.com
颠覆大数据分析之Spark弹性数据集. 译者:黄经业     购书. Spark中迭代式机器学习算法的数据流可以通过图2.3来进行理解. 将它和图2.1中Hadoop MR的迭代式机器学习的数据流比较一下. MR中每次迭代都会涉及HDFS的读写,而在Spark中则要简单得多. 它仅需从HDFS到Spark中的分布式共享对象空间的一次读入——从HDFS文件中创建RDD.

Spark算子:RDD键值转换操作(4)–cogroup、join – lxw的大数据田地

- -
关键字:Spark算子、Spark RDD键值转换、cogroup、join. cogroup相当于SQL中的全外关联full outer join,返回左右RDD中的记录,关联不上的为空. 参数numPartitions用于指定结果的分区数. 参数partitioner用于指定分区函数. ##参数为3个RDD示例略,同上.

Spark动态资源分配-Dynamic Resource Allocation – lxw的大数据田地

- -
关键字:spark、资源分配、dynamic resource allocation. Spark中,所谓资源单位一般指的是executors,和Yarn中的Containers一样,在Spark On Yarn模式下,通常使用–num-executors来指定Application使用的executors数量,而–executor-memory和–executor-cores分别用来指定每个executor所使用的内存和虚拟CPU核数.

[原]基于Spark的大数据精准营销中搜狗搜索引擎的用户画像挖掘

- - Soul Joy Hub
转载请注明:转载 from. from CCF举办的“大数据精准营销中搜狗用户画像挖掘”竞赛. “用户画像”是近几年诞生的名词. 很多营销项目或很多广告主,在打算投放广告前,都要求媒体提供其用户画像. 在以前,大多媒体会针对自身用户做一个分类,但是有了大数据后,企业及消费者行为带来一系列改变与重塑,通过用户画像可以更加拟人化的描述用户特点.

HDFS+Clickhouse+Spark:从0到1实现一款轻量级大数据分析系统

- - InfoQ推荐
导语 | 在产品精细化运营时代,经常会遇到产品增长问题:比如指标涨跌原因分析、版本迭代效果分析、运营活动效果分析等. 这一类分析问题高频且具有较高时效性要求,然而在人力资源紧张情况,传统的数据分析模式难以满足. 本文尝试从0到1实现一款轻量级大数据分析系统——MVP,以解决上述痛点问题. 文章作者:数据熊(笔名),腾讯云大数据分析工程师.

Spark概览

- - 简单文本
Spark具有先进的DAG执行引擎,支持cyclic data flow和内存计算. 因此,它的运行速度,在内存中是Hadoop MapReduce的100倍,在磁盘中是10倍. 这样的性能指标,真的让人心动啊. Spark的API更为简单,提供了80个High Level的操作,可以很好地支持并行应用.

Spark与Mapreduce?

- - 崔永键的博客
我本人是类似Hive平台的系统工程师,我对MapReduce的熟悉程度是一般,它是我的底层框架. 我隔壁组在实验Spark,想将一部分计算迁移到Spark上. 年初的时候,看Spark的评价,几乎一致表示,Spark是小数据集上处理复杂迭代的交互系统,并不擅长大数据集,也没有稳定性. 但是最近的风评已经变化,尤其是14年10月他们完成了Peta sort的实验,这标志着Spark越来越接近替代Hadoop MapReduce了.