Spark如何读取一些大数据集到本地机器上
- - ITeye博客最近在使用spark处理分析一些公司的埋点数据,埋点数据是json格式,现在要解析json取特定字段的数据,做一些统计分析,所以有时候需要把数据从集群上拉到driver节点做处理,这里面经常出现的一个问题就是,拉取结果集过大,而驱动节点内存不足,经常导致OOM,也就是我们常见的异常:. ````
//加载HDFS数据
val rdd=sc.textFile("/data/logs/*").
```` java.lang.OutOfMemoryError: Java heap space ````
```` //加载HDFS数据 val rdd=sc.textFile("/data/logs/*") //在驱动程序获取结果集 val datas=ArrayBuffer[String]() //把所有数据,拉倒驱动端操作 rdd.collect.foreach(line=>{ datas += line.split('#')(1) //得到某个字段 }) sc.stop() ````
```` textFile(path,partitionNums)//第二个参数可以指定分区个数 ````
```` (1)def coalesce(numPartitions: Int, shuffle: Boolean = false):RDD[T] (2)def repartition(numPartitions: Int):RDD[T] ````
```` rdd.coalesce(10,false) ````(2)如果要变成300,应该使用
```` rdd.coalesce(300,true) ````(3)如果要变成1,应该使用
```` rdd.coalesce(1,true) ````
```` def pt_convert( idx:Int,ds:Iterator[String] ,seq:Int):Iterator[String]={ if(seq==idx) ds else Iterator() } ------------------------------ //加载HDFS数据 val rdd=sc.textFile("/data/logs/*") //在驱动程序获取结果集 val datas=ArrayBuffer[String]() //重分区并合理优化分区个数 val new_rdd=rdd.coalesce(10) //得到所有的分区信息 val parts= new_rdd.partitions //循环处理每个分区的数据,避免导致OOM for(p<-parts){ //获取分区号 val idx=p.index //第二个参数为true,避免重新shuffle,保留原始分区数据 val parRdd=new_rdd.mapPartitionsWithIndex[String](pt_convert(_,_,idx),true) //读取结果数据 val data=parRdd.collect() //循环处理数据 for(line<-data){ datas += line.split('#')(1) //得到某个字段 } } ````
```` spark-submit --class SparkHdfsDataAnalysis --conf spark.driver.maxResultSize=1g --master yarn --executor-cores 5 --driver-memory 2g --executor-memory 3g --num-executors 10 --jars $jars spark-analysis.jar $1 $2 ````
```` spark.driver.maxResultSize=1g driver-memory 2g ````