hive insert模式分析

标签: hive python | 发表时间:2022-04-02 18:00 | 作者:zhangshun
出处:http://blog.zhangshun.net

hive写入数据有2种模式,一种是insert into,一种是insert overwrite

参考资料:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries

insert into 写入数据

  1. insert 写入数据可以指定字段写入数据,参考资料:As of Hive  1.2.0 each INSERT INTO T can take a column list like INSERT INTO T (z, x, c1).  See Description of  HIVE-9481 for examples.
  2. insert 写入数据的劣势,追溯历史数据需要手工清除源数据,清除数据的方法可以删除hdfs文件,或者清除分区(需要转换成内部表ALTER TABLE {schema_name}.{table_name} SET TBLPROPERTIES ('EXTERNAL'='false');)

insert overwrite 写入数据

  1. insert overwrite 写入数据时,当数据不连续会存在分险,比如耗材数据,并不是每天都有耗材成本,当使用日期+耗材分区时,第一天1.10号->写入1.8号和1.9号耗材数据,第二天1.11号->写入1.7和1.9号数据【如果恰好上游财务调账,将1.8号数据调整为1.7号】此时数据将异常,1.7号数据,1.8号数据重复
  2. 两种数据写入方法的适应场景

    1. insert into 写入数据模式比较适应与会出现数据不连续的分区模型表,写入数据前先清除分区,这样方便刷新表结构信息,同时支持alter table调整表结构,对历史数据非常友好,如果数据同步需要展示会出现较长的展示空白
    2. insert overwrite 写入数据比较适应连续的分区展示表,或者非分区表,数据几乎无缝变化,如果用于模型层,变更表结构需要尽量走重建表的模式。

    这里编写了一个sql重写的方式让insert into模式转换insert overwrite的效果

    简单描述就是先获取insert表的表结构,然后排序成默认的字段顺序,不足补null就ok了

    例如:tab表有col1,col2,col3,col4字段,insert模式是
    insert into tab (col3,col2,col4)
    select col3,col2,col4
    from tab
    应该转换成:
    insert overwrite table tab
    select null col1,col2,col3,col4
    from tab

    具体代码如下:

        def spark_rewrite_sql(self,sql_str,if_overwrite='overwrite'):
            '''
            spark insert 需要将字段写满才可以执行
            传入一个SQL , 返回写满的SQL insert语句 , 将SQL转换成 insert overwrite table 的模式
            '''
            try:
                #方法1的实现
                #去掉注释
                sql_str = re.sub(r'[\s]*--.*','',sql_str)
                #r1 表示获取表名,分区名,insert字段名,select字段名
                r1 = re.search(r'insert\s+into\s+(?:table\s+)?(.*)\s+partition\s+\((.*)\)\s+\((.*)\)\s+select\s+(.*?)\s+from[\s]',sql_str,re.S|re.I)
                table_name    = r1.group(1)
                partition_col = r1.group(2)
                insert_col    = r1.group(3)
                select_col    = r1.group(4) + '\n'
                # 将没有写的字段添加到insert_col的后面,同时将第一个select后面添加空字段
                col_list = re.sub(r'\s','',insert_col).split(',')
                hive_tab_dict = self.run_hive_get_desc_tab_dict(desc_tab_name=self.table_name,get_tab_len='0',use_db=self.schema_name)
                for col in hive_tab_dict.keys():
                    if col not in col_list:
                        insert_col += ','+col
                        select_col += ',null'
                #将select_col转换成有序的insert列
                act_insert_col = re.sub(r'\s','',insert_col).split(',')
                act_select_col = self.comma_split(select_col)
                change_select_col = ''
                for col in hive_tab_dict.keys():
                    hive_col = re.sub(r'\s','',col)
                    change_select_col += act_select_col[act_insert_col.index(hive_col)]+','
                change_select_col = change_select_col[:-1]
                #查询重写
                sql_str = sql_str.replace(r1.group(0)
                                        ,'''insert {if_overwrite} table {table_name} partition ({partition_col})
    select {change_select_col} 
    from '''.format(table_name=table_name,partition_col=partition_col,change_select_col=change_select_col,if_overwrite=if_overwrite))
            except Exception as e:
                print('没有查询重写insert overwrite')
                print(r1)
            return sql_str
    
        def run_hive(self,use_db='app',sql_str='',hive_set=True):
            '''
            #shell模式执行执行hive脚本
            #use_db 库名app
            #sql_str 执行的sql
            '''
            if hive_set:
                hive_set = self.hive_set
            else:
                hive_set = ''
            hive_parameter = '''hive -e "{hive_set}use {use_db};{sql_str}"
            '''.format(sql_str=sql_str,use_db=use_db,hive_set=self.re_replace(hive_set,r'[\s]*--.*','').replace('\n',''))
            cmd_list_values = self.popen(hive_parameter)
            return cmd_list_values
    
        def run_hive_get_desc_tab_dict(self, desc_tab_name ,use_db='app',get_tab_len='1'):
            '''
            获取hive表结构  get_tab_len
            0.  返回格式:字段名,类型,注释
                {'send_tm': ['send_tm', 'string', '发货日期'],'operation_type': ['operation_type', 'string', '仓配类型']}
            '''
            hive_tab_dict = {}
            cmd_list_values = self.run_hive(use_db,'desc {use_db}.{desc_tab_name}'.format(use_db=use_db,desc_tab_name=desc_tab_name))
            for line in cmd_list_values:
                col = line.split('\t')
                column_name = col[0].replace('\n','').strip()
                column_type = col[1].replace('\n','').strip()
                comment_name = col[2].replace('\n','').replace(',',',').strip()
                if column_name not in [''] and column_name[0] != '#':
                    hive_tab_dict[column_name]=[column_name,column_type,comment_name]
            return hive_tab_dict
    
        def comma_split(self,input_str):
            '''
            # 依次递归出括号内的逗号修改成替代的值,然后完成数据切分,这里应该有更好的方式,这里用分割符简单处理的
            '''
            out_put_str = input_str
            loop_num = max([input_str.count(','),input_str.count('('),input_str.count(')')])
            for _ in range(loop_num):
                r1 = re.search(r'\([^()]*\)',out_put_str,re.S|re.I)
                if r1 is not None:
                    r1 = r1.group()
                    r1_change = r1.replace(',','@#_#@#').replace('(','_@__##').replace(')','##__@_')
                    out_put_str = out_put_str.replace(r1,r1_change)
            out_put_list = []
            for line in out_put_str.split(','):
                out_put_list.append(line.replace('@#_#@#',',').replace('_@__##','(').replace('##__@_',')'))
            return out_put_list

相关 [hive insert 模式] 推荐:

hive insert模式分析

- - 要我带你去吗,这个小镇,愿望实现的地方。
hive写入数据有2种模式,一种是insert into,一种是insert overwrite. 参考资料:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries.

hive严格模式

- - CSDN博客数据库推荐文章
         hive提供了一个严格模式,可以防止用户执行那些可能产生意想不到的不好的效果的查询.         如果在一个分区表执行hive,除非where语句中包含分区字段过滤条件来显示数据范围,否则不允许执行. 就是用户不允许扫描所有的分区. 进行这个限制的原因是,通常分区表都拥有非常大的数据集,而且数据增加迅速.

MySQL insert性能优化

- - Rebill's Blog
对于一些数据量较大的系统,面临的问题除了是查询效率低下,还有一个很重要的问题就是插入时间长. 我们就有一个业务系统,每天的数据导入需要4-5个钟. 这种费时的操作其实是很有风险的,假设程序出了问题,想重跑操作那是一件痛苦的事情. 因此,提高大数据量系统的MySQL insert效率是很有必要的. 经过对MySQL的测试,发现一些可以提高insert效率的方法,供大家参考参考.

Oracle中Merge Into 代替Insert/Update的应用

- - 数据库 - ITeye博客
在进行SQL语句编写时,我们经常会遇到大量的同时进行Insert/Update的语句 ,也就是说当存在记录时,就更新(Update),不存在数据时,就插入(Insert). 在一个同时存在Insert和Update语法的Merge语句中,总共Insert/Update的记录数,就是Using语句中的记录数.

hive调优

- - 互联网 - ITeye博客
一、    控制hive任务中的map数: . 1.    通常情况下,作业会通过input的目录产生一个或者多个map任务. 主要的决定因素有: input的文件总个数,input的文件大小,集群设置的文件块大小(目前为128M, 可在hive中通过set dfs.block.size;命令查看到,该参数不能自定义修改);.

hive 优化 tips

- - CSDN博客推荐文章
一、     Hive join优化. 也可以显示声明进行map join:特别适用于小表join大表的时候,SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM a join b on a.key = b.key. 2.     注意带表分区的join, 如:.

Hive中的join

- - CSDN博客云计算推荐文章
select a.* from a join b on a.id = b.id select a.* from a join b on (a.id = b.id and a.department = b.department). 在使用join写查询的时候有一个原则:应该将条目少的表或者子查询放在join操作符的左边.

hive优化(2)

- - 开源软件 - ITeye博客
Hive是将符合SQL语法的字符串解析生成可以在Hadoop上执行的MapReduce的工具. 使用Hive尽量按照分布式计算的一些特点来设计sql,和传统关系型数据库有区别,. 所以需要去掉原有关系型数据库下开发的一些固有思维. 1:尽量尽早地过滤数据,减少每个阶段的数据量,对于分区表要加分区,同时只选择需要使用到的字段.

hive优化

- - 开源软件 - ITeye博客
hive.optimize.cp=true:列裁剪. hive.optimize.prunner:分区裁剪. hive.limit.optimize.enable=true:优化LIMIT n语句. hive.limit.optimize.limit.file=10:最大文件数.   1.job的输入数据大小必须小于参数:hive.exec.mode.local.auto.inputbytes.max(默认128MB).

Hive优化

- - 互联网 - ITeye博客
     使用Hive有一段时间了,目前发现需要进行优化的较多出现在出现join、distinct的情况下,而且一般都是reduce过程较慢.      Reduce过程比较慢的现象又可以分为两类:. 情形一:map已经达到100%,而reduce阶段一直是99%,属于数据倾斜. 情形二:使用了count(distinct)或者group by的操作,现象是reduce有进度但是进度缓慢,31%-32%-34%...一个附带的提示是使用reduce个数很可能是1.