hive insert模式分析
- - 要我带你去吗,这个小镇,愿望实现的地方。hive写入数据有2种模式,一种是insert into,一种是insert overwrite. 参考资料:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries.
hive写入数据有2种模式,一种是insert into,一种是insert overwrite
参考资料:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
insert into 写入数据
insert overwrite 写入数据
两种数据写入方法的适应场景
这里编写了一个sql重写的方式让insert into模式转换insert overwrite的效果
简单描述就是先获取insert表的表结构,然后排序成默认的字段顺序,不足补null就ok了
例如:tab表有col1,col2,col3,col4字段,insert模式是 insert into tab (col3,col2,col4) select col3,col2,col4 from tab 应该转换成: insert overwrite table tab select null col1,col2,col3,col4 from tab
具体代码如下:
def spark_rewrite_sql(self,sql_str,if_overwrite='overwrite'):
'''
spark insert 需要将字段写满才可以执行
传入一个SQL , 返回写满的SQL insert语句 , 将SQL转换成 insert overwrite table 的模式
'''
try:
#方法1的实现
#去掉注释
sql_str = re.sub(r'[\s]*--.*','',sql_str)
#r1 表示获取表名,分区名,insert字段名,select字段名
r1 = re.search(r'insert\s+into\s+(?:table\s+)?(.*)\s+partition\s+\((.*)\)\s+\((.*)\)\s+select\s+(.*?)\s+from[\s]',sql_str,re.S|re.I)
table_name = r1.group(1)
partition_col = r1.group(2)
insert_col = r1.group(3)
select_col = r1.group(4) + '\n'
# 将没有写的字段添加到insert_col的后面,同时将第一个select后面添加空字段
col_list = re.sub(r'\s','',insert_col).split(',')
hive_tab_dict = self.run_hive_get_desc_tab_dict(desc_tab_name=self.table_name,get_tab_len='0',use_db=self.schema_name)
for col in hive_tab_dict.keys():
if col not in col_list:
insert_col += ','+col
select_col += ',null'
#将select_col转换成有序的insert列
act_insert_col = re.sub(r'\s','',insert_col).split(',')
act_select_col = self.comma_split(select_col)
change_select_col = ''
for col in hive_tab_dict.keys():
hive_col = re.sub(r'\s','',col)
change_select_col += act_select_col[act_insert_col.index(hive_col)]+','
change_select_col = change_select_col[:-1]
#查询重写
sql_str = sql_str.replace(r1.group(0)
,'''insert {if_overwrite} table {table_name} partition ({partition_col})
select {change_select_col}
from '''.format(table_name=table_name,partition_col=partition_col,change_select_col=change_select_col,if_overwrite=if_overwrite))
except Exception as e:
print('没有查询重写insert overwrite')
print(r1)
return sql_str
def run_hive(self,use_db='app',sql_str='',hive_set=True):
'''
#shell模式执行执行hive脚本
#use_db 库名app
#sql_str 执行的sql
'''
if hive_set:
hive_set = self.hive_set
else:
hive_set = ''
hive_parameter = '''hive -e "{hive_set}use {use_db};{sql_str}"
'''.format(sql_str=sql_str,use_db=use_db,hive_set=self.re_replace(hive_set,r'[\s]*--.*','').replace('\n',''))
cmd_list_values = self.popen(hive_parameter)
return cmd_list_values
def run_hive_get_desc_tab_dict(self, desc_tab_name ,use_db='app',get_tab_len='1'):
'''
获取hive表结构 get_tab_len
0. 返回格式:字段名,类型,注释
{'send_tm': ['send_tm', 'string', '发货日期'],'operation_type': ['operation_type', 'string', '仓配类型']}
'''
hive_tab_dict = {}
cmd_list_values = self.run_hive(use_db,'desc {use_db}.{desc_tab_name}'.format(use_db=use_db,desc_tab_name=desc_tab_name))
for line in cmd_list_values:
col = line.split('\t')
column_name = col[0].replace('\n','').strip()
column_type = col[1].replace('\n','').strip()
comment_name = col[2].replace('\n','').replace(',',',').strip()
if column_name not in [''] and column_name[0] != '#':
hive_tab_dict[column_name]=[column_name,column_type,comment_name]
return hive_tab_dict
def comma_split(self,input_str):
'''
# 依次递归出括号内的逗号修改成替代的值,然后完成数据切分,这里应该有更好的方式,这里用分割符简单处理的
'''
out_put_str = input_str
loop_num = max([input_str.count(','),input_str.count('('),input_str.count(')')])
for _ in range(loop_num):
r1 = re.search(r'\([^()]*\)',out_put_str,re.S|re.I)
if r1 is not None:
r1 = r1.group()
r1_change = r1.replace(',','@#_#@#').replace('(','_@__##').replace(')','##__@_')
out_put_str = out_put_str.replace(r1,r1_change)
out_put_list = []
for line in out_put_str.split(','):
out_put_list.append(line.replace('@#_#@#',',').replace('_@__##','(').replace('##__@_',')'))
return out_put_list