SQL on Hadoop最新进展-转载

标签: 转载文章 | 发表时间:2013-10-22 21:47 | 作者:人月神话
出处:http://blog.sina.com.cn/cmmi
原文:http://yanbohappy.sinaapp.com/?p=381

为什么非要把SQL放到Hadoop上? SQL易于使用。那为什么非得基于Hadoop呢?

目前SQL on Hadoop产品主要有以下几种:Hive, Tez/Stinger, Impala, Shark/Spark, Phoenix, Hawq/Greenplum, HadoopDB, Citusdata等。本文主要讨论Hive, Tez/Stinger, Impala, Shark以及传统开源数据仓库brighthouse的特点和最新进展;下一篇文章会讨论Hawq/Greenplum, Phoenix, HadoopDB, Citusdata。

在互联网企业中一般的基于Hadoop的数据仓库的数据来源主要有以下几个:


1,通过Flume/Scribe/Chukwa这样的日志收集和分析系统把来自Apache/nginx等Server cluster的日志收集到HDFS上,然后通过Hive创建Table时指定SerDe把非结构化的日志数据转化成结构化数据。
2,通过Sqoop这样的工具把用户和业务维度数据(一般存储在Oracle/MySQL中)定期导入Hive,那么OLTP数据就有了一个用于OLAP的副本了。
3,通过ETL工具从其他外部DW数据源里导入的数据。

目前所有的SQL on Hadoop产品其实都是在某个或者某些特定领域内适合的,没有silver bullet。像当年Oracle/Teradata这样的满足几乎所有企业级应用的产品在现阶段是不现实的。所以每一种SQL on Hadoop产品都在尽量满足某一类应用的特征。

典型需求:
1,interactive query (ms~3min)
2,data analyst, reporting query (3min~20min)
3,data mining, modeling and large ETL (20 min ~ hr ~ day)
4,机器学习需求(通过MapReduce/MPI/Spark等计算模型来满足)

Hive:Hive是目前互联网企业中处理大数据、构建数据仓库最常用的解决方案,甚至在很多公司部署了Hadoop集群不是为了跑原生MapReduce程序,而全用来跑Hive SQL的查询任务。

对于有很多data scientist和analyst的公司,会有很多相同table的查询需求。那么显然每个人都从hive中查数据速度既慢又浪费资源。 我们在 online的数据库系统部署的时候都会在DB前面部署Redis或者memcache用于缓存用户经常访问的数据。那么OLAP应用也可以参考类似的方 法,把经常访问的数据放到内存组成的集群中供用户查询。

Facebook针对这一需求开发了Presto,一个把热数据放到内存中供SQL查询的系统。这个设计思路跟Impala和Stinger非常类似了。 使用Presto进行简单查询只需要几百毫秒,即使是非常复杂的查询,也只需数分钟即可完成,它在内存中运行,并且不会向磁盘写入。Facebook有超 过850名工程师每天用它来扫描超过320TB的数据,满足了80%的ad-hoc查询需求。

目前Hive的主要缺点:

1,data shuffle时网络瓶颈,Reduce要等Map结束才能开始,不能高效利用网络带宽
2,一般一个SQL都会解析成多个MR job,Hadoop每次Job输出都直接写HDFS,性能差
3,每次执行Job都要启动Task,花费很多时间,无法做到实时
4,由于把SQL转化成MapReduce job时,map,shuffle和reduce所负责执行的SQL功能不同。那么就有Map->MapReduce或者 MapReduce->Reduce这样的需求。这样可以降低写HDFS的次数,从而提高性能。

目前Hive主要的改进:

1,同一条hive sql解析出的多个MR任务的合并。

由Hive解析出来的MR jobs中有非常多的Map->MapReduce类型的job,可以考虑把这个过程合并成一个MRjob。https://issues.apache.org/jira/browse/HIVE-3952

2,Hive query optimizer

http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.0.0.2/ds_Hive/optimize-joins.html

    Joins where one side fits in memory
    Star schema join的改进,就是原来一个大表和多个小表在不同column匹配的条件下join需要解析成多个map join + MR job,现在可以合并成一个MR job

这个改进方向要做的就是用户不用给太多的hint,hive可以自己根据表的大小、行数等,自动选择最快的join的方法(小表能装进内存的话就用 map join,Map join能和其他MR job合并的就合并)。这个思路跟cost-based query optimizer有点类似了,用户写出来的SQL在翻译成执行计划之前要计算那种执行方式效率更高。

3,ORCFile

ORCFile是一种列式存储的文件,对于分析型应用来说列存有非常大的优势。
http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.0.0.2/ds_Hive/orcfile.html

原来的RCFile中把每一列看成binary blob,没有任何语义,所以只能用通用的zlib,LZO,Snappy等压缩方法。
ORCFile能够获取每一列的类型(int还是string),那么就可以使用诸如dictionary encoding, bit packing, delta encoding, run-length encoding等轻量级的压缩技术。这种压缩技术的优势有两点:一是提高压缩率;二是能够起到过滤无关数据的效果。

现在ORCFile中主要有三种编码:

    bit编码,所有数据类型都可以用。Google’s protocol buffers and uses the high bit to represent whether this byte is not the last and the lower 7 bits to encode data
    run-length encoding(行程长度压缩算法),int类型专用。
    dictionary encoding,string类型专用。同时这个dictionary还能帮助过滤查询中的predicate条件。

Run length Encoding对某些列压缩会减少存储3-4个数量级,对内存提升也有2-3个数量级,Dictionary Encoding一般对磁盘空间减少大概20倍,对内存空间大概减少5倍,根据Google PowerDrill的实验,在常见的聚合查询中这些特殊的编码方式会对查询速度有2-3个数量级的提升.

Predicate Pushdown:原来的Hive是把所有的数据都读到内存中,然后再判断哪些是符合查询需求的。在ORCFile中数据以Stripe为单元读取到内 存,那么ORCFile的RecordReader会根据Stripe的元数据(Index Data,常驻内存)判断该Stripe是否满足这个查询的需求,如果不满足直接略过不读,从而节省了IO。

关于ORCFile的压缩效果,使用情况和性能可以参考hortonworks的博客

http://hortonworks.com/blog/orcfile-in-hdp-2-better-compression-better-performance/

未来ORCFile还会支持轻量级索引,就是每一列中以1W行作为一组的最大值和最小值。

通过对ORCFile的上述分析,我想大家已经看到了brighthouse的影子了吧。都是把列数据相应的索引、统计数据、词典等放到内存中参与查询条件的过滤,如果不符合直接略过不读,大量节省IO。关于brighthouse大家可以参考下面的分析。

4,HiveServer2的Security和Concurrency特性

http://blog.cloudera.com/blog/2013/07/how-hiveserver2-brings-security-and-concurrency-to-apache-hive/

HiveServer2能够支持并发客户端(JDBC/ODBC)的访问。
Cloudera还搞了个Sentry用于Hadoop生态系统的的安全性和授权管理方面的工作。
这两个特点是企业级应用Hadoop/Hive主要关心的。

5,HCatalog Hadoop的统一元数据管理平台

目前Hive存储的表格元数据和HDFS存储的表格数据之间在schema上没有一致性保证,也就是得靠管理员来保证。目前Hive对列的改变只会修改 Hive 的元数据,而不会改变实际数据。比如你要添加一个column,那么你用hive命令行只是修改了了Hive元数据,没有修改HDFS上存储的格式。还得 通过修改导入HDFS的程序来改变HDFS上存储的文件的格式。而且还要重启Hive解析服务,累坏了系统管理员。

    Hadoop系统目前对表的处理是’schema on read’,有了HCatlog就可以做到EDW的’schema on write’。
    HCatlog提供REST接口提供元数据服务,有利于不同平台(HDFS/HBase/Oracle/MySQL)上的不同数据(unstructured/semi-structured/structured)共享。能够把Hadoop和EDW结合起来使用。
    HCatlog对用户解耦了schema和storage format。举个例子吧,在写MR任务的时候,目前是把所有的行数据都当成Text来处理,Text一点点解析出各个Column需要编程人员来控制。 有个HCatlog之后编程人员就不用管这事了,直接告诉它是哪个Database->Table,然后schema可以通过查询HCatlog来 获得。也省得数据存储格式发生变化之后,原来的程序不能用的情况发生。

6,Vectorized Query Execution in Hive

    https://issues.apache.org/jira/browse/HIVE-4160 。目前Hive中一行一行的处理数据,然后调用lazy deserialization解析出该列的Java对象,显然会严重影响效率。
    多行数据同时读取并处理(基本的比较或者数值计算),降低了一行一行处理中过多的函数调用的次数,提高了CPU利用率和cache命中率。需要实现基于向量的vectorized scan, filter, scalar aggregate, group-by-aggregate, hash join等基本操作单元。

Tez/Stinger

    底层执行引擎不再使用MR,而是使用基于YARN的更加通用的DAG执行引擎
    MR是高度抽象的Map和Reduce两个操作,而Tez则是在这两个操作的基础上提供了更丰富的接口。把Map具体到Input, Processor, Sort, Merge, Output,而Reduce也具体化成Input, Shuffle, Sort, Merge, Processor, Output。在MR程序里,编程人员只需编写对应的Processor逻辑,其他的是通过指定几种具体实现来完成的;而在Tez里面给我们更大的自由 度。其实这个跟Spark有点类似了,都是提供更丰富的可操作单元给用户。
    传统的Reduce只能输出到HDFS,而Tez的Reduce Processor能够输出给下一个Reduce Processor作为输入。
    Hot table也放到内存中cache起来
    Tez service:预启动container和container重用,降低了每次Query执行计划生成之后Task启动的时间,从而提高实时性。
    Tez本身只是YARN框架下得一个library,无需部署。只需指定mapreduce.framework.name=yarn-tez

http://dongxicheng.org/mapreduce-nextgen/apache-tez-newest-progress/

未来工作方向:

Cost-based optimizer,基于统计选择执行策略,多表JOIN时按照怎样的顺序执行效率最高。
统计执行过程中每个中间表的Row/Column等数目,从而决定启动多少个MR执行

Impala: Impala可以看成是Google Dremel架构和MPP (Massively Parallel Processing)结构的混合体。

https://github.com/cloudera/impala
Dremel论文: http://research.google.com/pubs/pub36632.html

优点:

  • 目前支持两种类型的JOIN:broadcast join和partition join。对于大表JOIN时由于内存限制,装不下时就要dump部分数据到磁盘,那样就会比较慢
  •  Parguet列存格式,同时能够处理嵌套数据。通过嵌套数据以及扩展的SQL查询语义,在某些特定的场景上避开了JOIN从而解决了一部分性能的bottleneck。
  •  Cloudera Manager 4.6以后会有slow query的分析功能
  •  Runtime Code Generation http://blog.cloudera.com/blog/2013/02/inside-cloudera-impala-runtime-code-generation/
  •  impala可以直接使用硬盘上的数据而不经过hdfs

缺点:

  •  impala不会按照group by的列排序
  •  目前不支持UDF,impala 1.2即将支持Hive UDFs(Java写的)和Impala native UDFs and UDAs(接口类似PosgreSQL)
  •  不支持像Hive的Serializer/Deserializer,从而使得它做从非结构化到结构化数据的ETL工作比较麻烦。
  •  不支持线上查询容错,如果参与查询的某个node出错,Impala将会丢弃本次查询。
  •  安全方面的支持还比较差。impalad之间传输的数据没有加密,不支持表或者列级别的授权。
  •  每个PlanFragment执行尽量并行化,但是有的时候并不是很容易。例如Hash Join需要等到其中一个表完全Scan结束才能开始。

不过虽然有这么多缺点,但是很多公司还是开始尝试Impala了。以百度为例,百度尝试把MySQL接入Impala的后端作为存储引擎,同时实现 相应操作对应的PlanFragment,那么用户来的query还是按照原来的解析方法解析成各种PlanFragment,然后直接调度到对应的节点 (HDFS DataNode/HBase RegionServer/MySQL)上执行。会把某些源数据或者中间数据放到MySQL中,用户的query涉及到使用这部分数据时直接去MySQL 里面拿。

Shark/Spark

由于数据能放到内存尽量放到内存,使用内存非常aggressive。优点是做JOIN时会比较快,缺点是占用内存太大,且自行管理内存,占用内存后不会释放。

性能:特别简单的select…where查询,shark性能的提升不明显。(因为hive也不怎么费时间)
但是如果查询比较复杂select…join…where…group by,hive的job数目会比较多,读写HDFS次数增多,时间自然会变长。当内存还足够大的时候shark性能是最好的,如果内存不够装下所有的数据时性能会下降,但还是会比Hive好很多。

SQL on Hadoop产品需要向传统数据仓库学习的地方

以开源数据仓库brighthouse(基于MySQL的数据仓库存储引擎)为例。
VLDB 2008 论文 <<Brighthouse: An Analytic Data Warehouse for Ad-hoc Queries>>

brighthouse的SQL解析用的是MySQL的代码,开发了brighthouse专用的optimizer,executor以及storage engine;brighthouse的数据存储通过三层来组织:Data Pack, Data Pack Node, Knowledge Node

  • DP(Data Pack):brighthouse是列存储的,每个DP存储一列中64K个单元的数据。
  • DPN(Data Pack Node):DPN和DP是一对一的关系,DPN中记录每个DP数据对应的一些统计值(max,min,count,sum)
  • KN(Knowledge Node):DP的更详细的数据信息和DP之间关系的信息

KN又分为一下三个部分:

  • HISTs(Histograms):数值类型列的统计直方图,能够快速判断这个DP是否符合查询条件。
  • CMAPs(Character Maps):文本类型的位图,用于快速查找字符。(优化关键字like)
  • Pack-To-Pack:等值JOIN操作时产生的两个列(DP)之间关系的位图。

DPN和KN相当于DP的一些统计信息,占整个DP的1%的存储空间,所以可以轻松装入内存。他们是为了快速定位哪些DP是跟这个query相关 (relevant)的,哪些是不相关(irrelevant)的,哪些是可能相关(suspect)的。从而减小IO读取的数据量,提高性能。

性能测试:http://www.fuchaoqun.com/tag/brighthouse/  从这个性能测试中可以看出:

1,压缩率:infobright比MyISAM/tar.gz的压缩率都要高很多
2,查询性能:跟建了索引的MyISAM表相比,查询速度也要快3-6倍

总之,大家都缺少的是:

1,workload management or query optimization
多个表的JOIN如何执行,例如3个表的JOIN会有6种执行策略,那么哪一种才是效率最高的呢。显然要通过计算每种执行顺序的开销来获得。在传统数据库 或者数据仓库领域(Oracle/Teradata/PostgreSQL)都有非常好的查询优化器,而在分布式系统中该如何衡量这些指标(磁盘IO,网 络带宽,内存)与最后查询效率之间的关系是个需要认真研究的问题。

2,关联子查询correlated sub-queries还是没有谁能够实现。

在TPC-H中又很多关联子查询的例子,但是现在的SQL on Hadoop产品都不支持。听Impala的人说,他们客户对这个的需求不是很强烈,大部分关联子查询可以转化成JOIN操作。但是目前的商业产品像Hawq/Greenplum都是支持关联子查询的。

  青春就应该这样绽放   游戏测试:三国时期谁是你最好的兄弟!!   你不得不信的星座秘密

相关 [sql on hadoop] 推荐:

SQL on Hadoop最新进展-转载

- - 人月神话的BLOG
原文:http://yanbohappy.sinaapp.com/?p=381. 为什么非要把SQL放到Hadoop上. 那为什么非得基于Hadoop呢. 目前SQL on Hadoop产品主要有以下几种:Hive, Tez/Stinger, Impala, Shark/Spark, Phoenix, Hawq/Greenplum, HadoopDB, Citusdata等.

SQL on Hadoop最新进展2-转载

- - 人月神话的BLOG
原文:http://yanbohappy.sinaapp.com/?p=407. 上篇主要讨论了Hive, Stinger/Tez, Impala, Shark这些SQL on Hadoop产品,这篇接着讨论Phoenix, Hadapt, Hawq. Salesforce开源的基于HBase的SQL查询系统,建立在HBase client API, coprocessors, custom filter的基础之上.

Hadoop Hive sql语法详解5--HiveQL与SQL区别

- - SQL - 编程语言 - ITeye博客
1.hive内联支持什么格式. 3.hive中empty是否为null. 4.hive是否支持插入现有表或则分区中. 5.hive是否支持INSERT INTO 表 values(). 1、Hive不支持等值连接 . •SQL中对两表内联可以写成:. •分号是SQL语句结束标记,在HiveQL中也是,但是在HiveQL中,对分号的识别没有那么智慧,例如:.

Greenplum Pivotal HD结合了SQL和Hadoop的优势

- - InfoQ cn
EMC Greenplum宣布了一个新的Hadoop发行版本—— Pivotal HD,其中包含一个完全运行于HDFS之上的MPP数据库,兼容SQL,而且速度“比Hive快数百倍”. Pivotal HD支持标准Hadoop发型版本的常用特性(包括HDFS、Pig、Hive、Mahout和Map-Reduce等),但又加入了一些其他的组件,具体如下面结构图所示: .

盘点SQL on Hadoop中用到的主要技术

- - 奔跑的兔子
自hive出现之后,经过几年的发展,SQL on Hadoop相关的系统已经百花齐放,速度越来越快,功能也越来越齐全. 本文并不是要去比较所谓“交互式查询哪家强”,而是试图梳理出一个统一的视角,来看看各家系统有哪些技术上相通之处. 考虑到系统使用的广泛程度与成熟度,在具体举例时一般会拿Hive和Impala为例,当然在调研的过程中也会涉及到一些其他系统,如Spark SQL,Presto,TAJO等.

PL/SQL动态SQL(原创)

- - ITeye博客
使用动态SQL是在编写PL/SQL过程时经常使用的方法之一. 很多情况下,比如根据业务的需要,如果输入不同查询条件,则生成不同的执行SQL查询语句,对于这种情况需要使用动态SQL来完成. 再比如,对于分页的情况,对于不同的表,必定存在不同的字段,因此使用静态SQL则只能针对某几个特定的表来形成分页.

Hadoop Streaming 编程

- - 学着站在巨人的肩膀上
Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer,例如:. 采用shell脚本语言中的一些命令作为mapper和reducer(cat作为mapper,wc作为reducer). 本文安排如下,第二节介绍Hadoop Streaming的原理,第三节介绍Hadoop Streaming的使用方法,第四节介绍Hadoop Streaming的程序编写方法,在这一节中,用C++、C、shell脚本 和python实现了WordCount作业,第五节总结了常见的问题.

Hadoop使用(一)

- Pei - 博客园-首页原创精华区
Hadoop使用主/从(Master/Slave)架构,主要角色有NameNode,DataNode,secondary NameNode,JobTracker,TaskTracker组成. 其中NameNode,secondary NameNode,JobTracker运行在Master节点上,DataNode和TaskTracker运行在Slave节点上.

Hadoop MapReduce技巧

- - 简单文本
我在使用Hadoop编写MapReduce程序时,遇到了一些问题,通过在Google上查询资料,并结合自己对Hadoop的理解,逐一解决了这些问题. Hadoop对MapReduce中Key与Value的类型是有要求的,简单说来,这些类型必须支持Hadoop的序列化. 为了提高序列化的性能,Hadoop还为Java中常见的基本类型提供了相应地支持序列化的类型,如IntWritable,LongWritable,并为String类型提供了Text类型.

Hadoop TaskScheduler浅析

- - kouu&#39;s home
TaskScheduler,顾名思义,就是MapReduce中的任务调度器. 在MapReduce中,JobTracker接收JobClient提交的Job,将它们按InputFormat的划分以及其他相关配置,生成若干个Map和Reduce任务. 然后,当一个TaskTracker通过心跳告知JobTracker自己还有空闲的任务Slot时,JobTracker就会向其分派任务.