Spark-1.3.1与Hive整合实现查询分析

标签: Spark 开源技术 | 发表时间:2015-05-14 21:42 | 作者:Yanjun
出处:http://shiyanjun.cn

在大数据应用场景下,使用过Hive做查询统计分析的应该知道,计算的延迟性非常大,可能一个非常复杂的统计分析需求,需要运行1个小时以上,但是比之于使用MySQL之类关系数据库做分析,执行速度快很多很多。使用HiveQL写类似SQL的查询分析语句,最终经过Hive查询解析器,翻译成Hadoop平台上的MapReduce程序进行运行,这也是MapReduce计算引擎的特点带来的延迟问题:Map中间结果写文件。如果一个HiveQL语句非常复杂,会被翻译成多个MapReduce Job,那么就会有很多的Map输出中间结果数据到文件中,基本没有数据的共享。
如果使用Spark计算平台,基于Spark RDD数据集模型计算,可以减少计算过程中产生中间结果数据写文件的开销,Spark会把数据直接放到内存中供后续操作共享数据,减少了读写磁盘I/O操作带来的延时。另外,如果基于Spark on YARN部署模式,可以充分利用数据在Hadoop集群DataNode节点的本地性(Locality)特点,减少数据传输的通信开销。

软件准备

我把使用的相关软件的版本在这里列出来,以便测试验证,如下所示:

  • CentOS-6.6 (Final)
  • JDK-1.7.0_25
  • Maven-3.2.1
  • Hadoop-2.2.0
  • Spark-1.3.1
  • Hive-0.12.0
  • MySQL-Server-5.5.8

另外还要搭建好Hadoop集群,以及安装配置好Hive客户端,能够在Hive上正确执行查询分析,安装过程不再累述,可以参考网上很多文档。由于我们使用最新版本的Spark-1.3.1,为了使用我们现有2.2.0版本的Hadoop平台,所以需要重新编译构建Spark程序,接下来会做详细说明。
这里,给出使用的各个集群环境的结构拓扑,如下表所示:

Source节点 服务名称 说明
hadoop1 Spark Master/Spark Driver Spark集群
hadoop2 DataNode/NodeManager Hadoop集群
hadoop3 DataNode/NodeManager Hadoop集群
hadoop4 Hive Hive客户端
hadoop5 Spark Worker Spark集群
hadoop6 Spark Worker/NameNode/ResourceManager/Secondary NameNode Spark集群/Hadoop集群
10.10.4.130 MySQL 用于存储Hive元数据

上述节点配置相同,因为是测试机,所以配置相对比较低。我们是分别将Spark集群和Hadoop集群的Worker和NodeManager/DataNode分开部署了,在使用Spark做计算的时候,就没有数据本地性(Locality)的特性,所以如果基于Spark on YARN的模式,可能会获得更好地计算性能的提升。

Spark编译安装配置

首先从官网下在Spark源码文件:

cd ~/
wget http://mirror.bit.edu.cn/apache/spark/spark-1.3.1/spark-1.3.1.tgz
tar xvzf spark-1.3.1.tgz
mv spark-1.3.1 spark-1.3.1-bin-hadoop2.2

我的环境是JDK 1.7,使用Maven构建,执行如下命令行:

export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
cd /home/spark/spark-1.3.1-bin-hadoop2.2/
mvn -Pyarn -Dyarn.version=2.2.0 -Phadoop-2.2 -Dhadoop.version=2.2.0 -Phive -Phive-0.12.0 -Phive-thriftserver -DskipTests clean package

编译构建完成以后,可以看到如下内容:

/home/spark/spark-1.3.1-bin-hadoop2.2/assembly/target/scala-2.10/spark-assembly-1.3.1-hadoop2.2.0.jar
/home/spark/spark-1.3.1-bin-hadoop2.2/lib_managed/*.jar

如果网络状况不好,可能无法构建成功。
另外,也可以使用sbt构建,执行如下命令:

cd /home/spark/spark-1.3.1-bin-hadoop2.2/
build/sbt -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Phive -Phive-0.12.0 -Phive-thriftserver assembly

如果失败,多试几次可能会以成功。
使用Maven构建与使用sbt构建,都要耗费很长时间,而且最终生成的文件可能会有所不同。

下面,我们配置Spark集群,首先在Spark Master节点上配置,修改配置文件conf/slaves,将Worker节点主机名加入进去,一行一个,内容如下所示:

hadoop5
hadoop6

修改Spark环境变量配置文件conf/spark-env.sh,增加如下配置行:

SPARK_MASTER_IP=hadoop1

登录到Hive安装的节点,将Hive的配置文件拷贝到Spark安装目录下的conf目录下面,执行如下命令:

scp /usr/local/hive/conf/hive-site.xml spark@hadoop1:/home/spark/spark-1.3.1-bin-hadoop2.2/conf/

最后分发Spark安装文件到Spark Worker节点上:

sudo scp -r /home/spark/spark-1.3.1-bin-hadoop2.2 spark@hadoop5:/home/spark/spark-1.3.1-bin-hadoop2.2/
sudo scp -r /home/spark/spark-1.3.1-bin-hadoop2.2 spark@hadoop6:/home/spark/spark-1.3.1-bin-hadoop2.2/

为了方便启动Spark集群,可以配置Spark Master到Workers的ssh免密码登录,然后只需要在Master中执行如下脚本即可:

sbin/start-all.sh

可以查看Spark各个节点的服务启动情况,也可以通过Spark UI链接进入页面查看 http://hadoop1:8080/,默认是8080端口,如果8080端口已经被占用,Spark会自动选择端口号数字加1,如 http://hadoop1:8081/

Spark+Hive整合

我们知道,在使用Hive进行查询的时候,到底层MapReduce计算层会将HiveQL翻译成MapReduce程序,在Hadoop平台上执行计算,这使得计算的延迟比较大。我们整合Spark和Hive,就是通过Spark平台来计算Hive查询,也就是Hive不再使用它默认的MapReduce计算引擎,Spark会直接读取Hive的元数据存储,将Hive数据转换成Spark RDD数据,通过Spark提供的计算操作来实现(Transformation和Action)。
我们首先在Hive中创建一个数据库event_db,执行如下命令:

CREATE DATABASE event_db;

在创建一个Hive外部表user_event,执行DDL脚本:

CREATE EXTERNAL TABLE event_db.user_event(
  appid string,
  event_code string,
  udid string,
  uid string,
  install_id string,
  session_id string,
  play_id string,
  page string,
  timestamp string,
  action string,
  network string,
  operator string,
  lon string,
  lat string,
  imsi string,
  speed string,
  event_id string,
  type string,
  result string,
  refer string,
  radio_id bigint,
  audio_id bigint,
  play_time bigint,
  duration bigint,
  start_time string,
  end_time string,
  request_agent string,
  request_referer string,
  device_id string,
  model_id string,
  area_tag string,
  remarks4 string,
  remarks5 string,
  ip bigint,
  area_code int,
  create_time string)
PARTITIONED BY (
  create_date string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'hdfs://hadoop6:8020/hive/event_db/user_event';

我选择了一天的用户事件数据(大概有5G左右,13824560条记录),将数据加载到Hive的分区中,执行如下LOAD命令行:

LOAD DATA LOCAL INPATH '/home/shirdrn/data/user_event_20150511.log' OVERWRITE INTO TABLE event_db.user_event PARTITION (create_date='2015-05-11');
  • Standalone模式

我们可以通过指定SPARK_CLASSPATH变量,将需要访问Hive的元数据存储MySQL的驱动包加入进去,然后直接启动Spark SQL Shell即可。这里,使用Spark默认的集群管理模式Standalone,启动Shell时需要指定master选项为Spark Master服务连接:

SPARK_CLASSPATH="$SPARK_CLASSPATH:/home/spark/spark-1.3.0-bin-hadoop2.2/lib_managed/jars/mysql-connector-java-5.1.34.jar"
bin/spark-sql --master spark://hadoop1:7077

这样我们可以直接在Spark SQL Shell上输入Hive查询语句就可以执行查询分析计算。
另外,还可以通过Spark Shell进行操作,不过需要了解Spark SQL支持的Scala API,启动Spark Shell,执行如下命令:

SPARK_CLASSPATH="$SPARK_CLASSPATH:/home/spark/spark-1.3.0-bin-hadoop2.2/lib_managed/jars/mysql-connector-java-5.1.34.jar"
bin/spark-shell --master spark://hadoop1:7077

然后,创建一个org.apache.spark.sql.hive.HiveContext对象,用来执行Hive查询:

scala> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@6dcc664b

接着可以执行查询:

scala> sqlContext.sql("SELECT area_code,event_code,COUNT(udid) AS user_cnt FROM event_db.user_event WHERE create_date='2015-05-11' GROUP BY area_code,event_code LIMIT 10").collect().foreach(println)

可以看到查询结果。

  • yarn-client模式

如果基于YARN模式运行(与Hive整合只支持yarn-client模式,不支持yarn-cluster),需要指定Hadoop集群的环境变量(在当前Driver节点上必须有Hadoop的安装文件),如下所示:

export HADOOP_HOME=/usr/local/hadoop-2.2.0
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

然后启动Spark SQL Shell,执行如下命令:

SPARK_CLASSPATH="$SPARK_CLASSPATH:/home/spark/spark-1.3.0-bin-hadoop2.2/lib_managed/jars/mysql-connector-java-5.1.34.jar"
bin/spark-sql --master yarn-client
  • 查询结果耗时比较

我们使用Hive,以及上面提到的两种模式分别执行如下HiveQL查询统计语句:

SELECT area_code,event_code,COUNT(DISTINCT udid) AS user_cnt FROM event_db.user_event WHERE create_date='2015-05-11' AND (create_time BETWEEN '2015-05-11 17:00:00' AND '2015-05-11 23:30:00') GROUP BY area_code,event_code ORDER BY user_cnt DESC LIMIT 10

可以看到查询结果,结果如下所示:

156000000     100003     8290
110000     100003     7832
440100     100003     4956
110000     100010     3850
440300     100003     3709
320100     100003     3683
410100     100003     3669
110000     101014     3479
110000     200004     3455
110000     100011     3423

对比耗时,如下表所示:

运行模式 花费时间(秒)
Hive 189.695
Spark Standalone 82.895
Spark yarn-client 104.259

可见,无论是Spark Standalone模式还是Spark yarn-client模式,耗时都比直接执行Hive查询要少得多。我们执行Spark计算,2个Worker节点上各用了一个Executor,每个Executor使用512M内存,如果增加Executor个数,或者调大内存,应该比上面运行耗时更少,例如,启动Spark SQL Shell并指定相关参数:

bin/spark-sql --master spark://hadoop1:7077 --driver-memory 1G --driver-cores 2 --executor-memory 4G

或者:

bin/spark-sql --master yarn-client --driver-memory 1G --driver-cores 2 --executor-cores 4 --num-executors 8 --executor-memory 4G

总结

根据上面我们实践的整合Spark+Hive,在执行复杂统计分析时,完全可以使用Spark SQL来替代Hive,至少会提高几倍的速度,对于一些基于Hive统计应用,可能每天晚上要执行6个小时以上的统计计算,导致第二天结果数据都无法出来,如果统计需求再次增加,可能时间还会更长。除了对Hive查询语句进行优化之外,应该说优化空间不大,所以这个时候可以考虑使用Spark平台来实现统计分析,而且,Spark集群可以线性扩展,对于一些调优也更容易一些。
另外,Spark的发展超级迅猛,新版本频繁发布,而且在后期的版本中还会在性能方面进行大幅改进。Tungsten项目将是Spark自诞生以来内核级别的最大改动,以大幅度提升Spark应用程序的内存和CPU利用率为目标,旨在最大程度上压榨新时代硬件性能。Tungsten项目包括了3个方面的努力:

  • Memory Management和Binary Processing:利用应用的语义(Application Semantics)来更明确地管理内存,同时消除JVM对象模型和垃圾回收开销。
  • Cache-aware Computation(缓存友好的计算):使用算法和数据结构来实现内存分级结构(Memory Hierarchy)。
  • 代码生成(Code Generation):使用代码生成来利用新型编译器和CPU。

Tungsten将大幅度提升Spark的核心引擎,在Spark 1.4版本,会包括Dataframe API中聚合操作的内存管理,以及定制化序列化器。在Spark 1.5版本中,会有部分项目(基于DataFrame模型)包括二进制内存管理的扩展和Cache-aware数据结构。

参考链接

相关 [spark hive 分析] 推荐:

Flume+Spark+Hive+Spark SQL离线分析系统

- - CSDN博客推荐文章
前段时间把Scala和Spark一起学习了,所以借此机会在这里做个总结,顺便和大家一起分享一下目前最火的分布式计算技术Spark. 当然Spark不光是可以做离线计算,还提供了许多功能强大的组件,比如说,Spark Streaming 组件做实时计算,和Kafka等消息系统也有很好的兼容性;Spark Sql,可以让用户通过标准SQL语句操作从不同的数据源中过来的结构化数据;还提供了种类丰富的MLlib库方便用户做机器学习等等.

beeline 连接SPARK /Hive

- - 开源软件 - ITeye博客
hiveclient所在主机的jdk 1.7_51,hive 0.12和hadoop 2.3.0是从服务器端拷贝过来的,环境变量一切OK. 执行连接报了Invalid URL的错误:. 开始的一段时间都在纠结这个jdbc的URL格式问题,后来在cloudra论坛上找到了一个方法,. 直接调用的jdbc:hive2的驱动测试是正常的,证明CLASSPATH等环境变量没有问题.

Spark-1.3.1与Hive整合实现查询分析

- - 简单之美
在大数据应用场景下,使用过Hive做查询统计分析的应该知道,计算的延迟性非常大,可能一个非常复杂的统计分析需求,需要运行1个小时以上,但是比之于使用MySQL之类关系数据库做分析,执行速度快很多很多. 使用HiveQL写类似SQL的查询分析语句,最终经过Hive查询解析器,翻译成Hadoop平台上的MapReduce程序进行运行,这也是MapReduce计算引擎的特点带来的延迟问题:Map中间结果写文件.

hive参数hive.mapred.mode分析

- - CSDN博客云计算推荐文章
Hive配置中有个参数hive.mapred.mode,分为nonstrict,strict,默认是nonstrict. 如果设置为strict,会对三种情况的语句在compile环节做过滤:. 这种情况由于没有指定reduce join key,所以只会启用一个reducer,数据量大时会造成性能瓶颈.

实时分析系统(HIVE/HBASE/IMPALA)浅析

- - 数据库 - ITeye博客
1. 什么是实时分析(在线查询)系统. 大数据领域里面,实时分析(在线查询)系统是最常见的一种场景,通常用于客户投诉处理,实时数据分析,在线查询等等过. 因为是查询应用,通常有以下特点:. b. 查询条件复杂(多个维度,维度不固定),有简单(带有ID). c. 查询范围大(通常查询表记录在几十亿级别).

Hive中小表与大表关联(join)的性能分析

- - 数据库 - ITeye博客
经常看到一些Hive优化的建议中说当小表与大表做关联时,把小表写在前面,这样可以使Hive的关联速度更快,提到的原因都是说因为小表可以先放到内存中,然后大表的每条记录再去内存中检测,最终完成关联查询. 这样的原因看似合理,但是仔细推敲,又站不住脚跟. 如果所谓的小表在内存中放不下怎么办. 我用2个只有几条记录的表做关联查询,这应该算是小表了,在查看reduce的执行日志时依然是有写磁盘的操作的.

Hive中跑MapReduce Job出现OOM问题分析及解决

- - CSDN博客云计算推荐文章
今天在跑一段很复杂而且涉及数据量10多年的N个表join的长SQL时,发生了OOM的异常. 由于一个map通常配置只有64MB或者128MB,则在Map阶段出现OOM的情况很少见. 所以一般发生在reduce阶段. 但是今天这个异常详细的看后,会发现既不是map阶段,也不是reduce阶段,发现不是执行过程,而是driver提交job阶段就OOM了.

Spark Shuffle过程分析:Map阶段处理流程

- - 简单之美
默认配置情况下,Spark在Shuffle过程中会使用SortShuffleManager来管理Shuffle过程中需要的基本组件,以及对RDD各个Partition数据的计算. 我们可以在Driver和Executor对应的SparkEnv对象创建过程中看到对应的配置,如下代码所示:. 如果需要修改ShuffleManager实现,则只需要修改配置项spark.shuffle.manager即可,默认支持sort和 tungsten-sort,可以指定自己实现的ShuffleManager类.

Spark Streaming 1.6 流式状态管理分析 - 简书

- -
Spark 1.6发布后,官方声称流式状态管理有10倍性能提升. 这篇文章会详细介绍Spark Streaming里新的流式状态管理. 在流式计算中,数据是持续不断来的,有时候我们要对一些数据做跨周期(Duration)的统计,这个时候就不得不维护状态了. 而状态管理对Spark 的 RDD模型是个挑战,因为在spark里,任何数据集都需要通过RDD来呈现,而RDD 的定义是一个不变的分布式集合.

颠覆大数据分析之Spark弹性分布式数据集

- - 并发编程网 - ifeve.com
颠覆大数据分析之Spark弹性数据集. 译者:黄经业     购书. Spark中迭代式机器学习算法的数据流可以通过图2.3来进行理解. 将它和图2.1中Hadoop MR的迭代式机器学习的数据流比较一下. MR中每次迭代都会涉及HDFS的读写,而在Spark中则要简单得多. 它仅需从HDFS到Spark中的分布式共享对象空间的一次读入——从HDFS文件中创建RDD.