Spark如何读取一些大数据集到本地机器上
- - ITeye博客最近在使用spark处理分析一些公司的埋点数据,埋点数据是json格式,现在要解析json取特定字段的数据,做一些统计分析,所以有时候需要把数据从集群上拉到driver节点做处理,这里面经常出现的一个问题就是,拉取结果集过大,而驱动节点内存不足,经常导致OOM,也就是我们常见的异常:. ````
//加载HDFS数据
val rdd=sc.textFile("/data/logs/*").
```` java.lang.OutOfMemoryError: Java heap space ````
````
//加载HDFS数据
val rdd=sc.textFile("/data/logs/*")
//在驱动程序获取结果集
val datas=ArrayBuffer[String]()
//把所有数据,拉倒驱动端操作
rdd.collect.foreach(line=>{
datas += line.split('#')(1) //得到某个字段
})
sc.stop()
````
```` textFile(path,partitionNums)//第二个参数可以指定分区个数 ````
```` (1)def coalesce(numPartitions: Int, shuffle: Boolean = false):RDD[T] (2)def repartition(numPartitions: Int):RDD[T] ````
```` rdd.coalesce(10,false) ````(2)如果要变成300,应该使用
```` rdd.coalesce(300,true) ````(3)如果要变成1,应该使用
```` rdd.coalesce(1,true) ````
````
def pt_convert( idx:Int,ds:Iterator[String] ,seq:Int):Iterator[String]={
if(seq==idx) ds else Iterator()
}
------------------------------
//加载HDFS数据
val rdd=sc.textFile("/data/logs/*")
//在驱动程序获取结果集
val datas=ArrayBuffer[String]()
//重分区并合理优化分区个数
val new_rdd=rdd.coalesce(10)
//得到所有的分区信息
val parts= new_rdd.partitions
//循环处理每个分区的数据,避免导致OOM
for(p<-parts){
//获取分区号
val idx=p.index
//第二个参数为true,避免重新shuffle,保留原始分区数据
val parRdd=new_rdd.mapPartitionsWithIndex[String](pt_convert(_,_,idx),true)
//读取结果数据
val data=parRdd.collect()
//循环处理数据
for(line<-data){
datas += line.split('#')(1) //得到某个字段
}
}
````
```` spark-submit --class SparkHdfsDataAnalysis --conf spark.driver.maxResultSize=1g --master yarn --executor-cores 5 --driver-memory 2g --executor-memory 3g --num-executors 10 --jars $jars spark-analysis.jar $1 $2 ````
```` spark.driver.maxResultSize=1g driver-memory 2g ````