HBase BulkLoad批量写入数据实战 - 哥不是小萝莉 - 博客园

标签: | 发表时间:2020-02-09 19:17 | 作者:
出处:https://www.cnblogs.com

1.概述

在进行数据传输中,批量加载数据到HBase集群有多种方式,比如通过HBase API进行批量写入数据、使用Sqoop工具批量导数到HBase集群、使用MapReduce批量导入等。这些方式,在导入数据的过程中,如果数据量过大,可能耗时会比较严重或者占用HBase集群资源较多(如磁盘IO、HBase Handler数等)。今天这篇博客笔者将为大家分享使用HBase BulkLoad的方式来进行海量数据批量写入到HBase集群。

2.内容

在使用BulkLoad之前,我们先来了解一下HBase的存储机制。HBase存储数据其底层使用的是HDFS来作为存储介质,HBase的每一张表对应的HDFS目录上的一个文件夹,文件夹名以HBase表进行命名(如果没有使用命名空间,则默认在default目录下),在表文件夹下存放在若干个Region命名的文件夹,Region文件夹中的每个列簇也是用文件夹进行存储的,每个列簇中存储就是实际的数据,以HFile的形式存在。路径格式如下:

/hbase/data/default/<tbl_name>/<region_id>/<cf>/<hfile_id>

2.1 实现原理

按照HBase存储数据按照HFile格式存储在HDFS的原理,使用MapReduce直接生成HFile格式的数据文件,然后在通过RegionServer将HFile数据文件移动到相应的Region上去。流程如下图所示:

2.2. 生成HFile文件

HFile文件的生成,可以使用MapReduce来进行实现,将数据源准备好,上传到HDFS进行存储,然后在程序中读取HDFS上的数据源,进行自定义封装,组装RowKey,然后将封装后的数据在回写到HDFS上,以HFile的形式存储到HDFS指定的目录中。实现代码如下:

/**
 * Read DataSource from hdfs & Gemerator hfile.
 * 
 * @author smartloli.
 *
 *         Created by Aug 19, 2018
 */
public class GemeratorHFile2 {
    static class HFileImportMapper2 extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
        
        protected final String CF_KQ = "cf";

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            System.out.println("line : " + line);
            String[] datas = line.split(" ");
            String row = new Date().getTime() + "_" + datas[1];
            ImmutableBytesWritable rowkey = new ImmutableBytesWritable(Bytes.toBytes(row));
            KeyValue kv = new KeyValue(Bytes.toBytes(row), this.CF_KQ.getBytes(), datas[1].getBytes(), datas[2].getBytes());
            context.write(rowkey, kv);
        }
    }

    public static void main(String[] args) {
        if (args.length != 1) {
            System.out.println("<Usage>Please input hbase-site.xml path.</Usage>");
            return;
        }
        Configuration conf = new Configuration();
        conf.addResource(new Path(args[0]));
        conf.set("hbase.fs.tmp.dir", "partitions_" + UUID.randomUUID());
        String tableName = "person";
        String input = "hdfs://nna:9000/tmp/person.txt";
        String output = "hdfs://nna:9000/tmp/pres";
        System.out.println("table : " + tableName);
        HTable table;
        try {
            try {
                FileSystem fs = FileSystem.get(URI.create(output), conf);
                fs.delete(new Path(output), true);
                fs.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }

            Connection conn = ConnectionFactory.createConnection(conf);
            table = (HTable) conn.getTable(TableName.valueOf(tableName));
            Job job = Job.getInstance(conf);
            job.setJobName("Generate HFile");

            job.setJarByClass(GemeratorHFile2.class);
            job.setInputFormatClass(TextInputFormat.class);
            job.setMapperClass(HFileImportMapper2.class);
            FileInputFormat.setInputPaths(job, input);
            FileOutputFormat.setOutputPath(job, new Path(output));

            HFileOutputFormat2.configureIncrementalLoad(job, table);
            try {
                job.waitForCompletion(true);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

在HDFS目录/tmp/person.txt中,准备数据源如下:

1 smartloli 100
2 smartloli2 101
3 smartloli3 102

然后,将上述代码编译打包成jar,上传到Hadoop集群进行执行,执行命令如下:

hadoop jar GemeratorHFile2.jar /data/soft/new/apps/hbaseapp/hbase-site.xml

如果在执行命令的过程中,出现找不到类的异常信息,可能是本地没有加载HBase依赖JAR包,在当前用户中配置如下环境变量信息:

export HADOOP_CLASSPATH=$HBASE_HOME/lib/*:classpath

然后,执行source命令使配置的内容立即生生效。

2.3. 执行预览

在成功提交任务后,Linux控制台会打印执行任务进度,也可以到YARN的资源监控界面查看执行进度,结果如下所示:

等待任务的执行,执行完成后,在对应HDFS路径上会生成相应的HFile数据文件,如下图所示:

2.4 使用BulkLoad导入到HBase

然后,在使用BulkLoad的方式将生成的HFile文件导入到HBase集群中,这里有2种方式。一种是写代码实现导入,另一种是使用HBase命令进行导入。

2.4.1 代码实现导入

通过LoadIncrementalHFiles类来实现导入,具体代码如下:

/**
* Use BulkLoad inport hfile from hdfs to hbase.
* 
* @author smartloli.
*
* Created by Aug 19, 2018
*/
public class BulkLoad2HBase {

    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            System.out.println("<Usage>Please input hbase-site.xml path.</Usage>");
            return;
        }
        String output = "hdfs://cluster1/tmp/pres";
        Configuration conf = new Configuration();
        conf.addResource(new Path(args[0]));
        HTable table = new HTable(conf, "person");
        LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
        loader.doBulkLoad(new Path(output), table);
    }
    
}

执行上述代码,运行结果如下:

2.4.2 使用HBase命令进行导入

先将生成好的HFile文件迁移到目标集群(即HBase集群所在的HDFS上),然后在使用HBase命令进行导入,执行命令如下:

# 先使用distcp迁移hfile
hadoop distcp -Dmapreduce.job.queuename=queue_1024_01 -update -skipcrccheck -m 10 /tmp/pres hdfs://nns:9000/tmp/pres

# 使用bulkload方式导入数据
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /tmp/pres person

最后,我们可以到指定的RegionServer节点上查看导入的日志信息,如下所示为导入成功的日志信息:

2018-08-19 16:30:34,969 INFO  [B.defaultRpcServer.handler=7,queue=1,port=16020] regionserver.HStore: Successfully loaded store file hdfs://cluster1/tmp/pres/cf/7b455535f660444695589edf509935e9 into store cf (new location: hdfs://cluster1/hbase/data/default/person/2d7483d4abd6d20acdf16533a3fdf18f/cf/d72c8846327d42e2a00780ac2facf95b_SeqId_4_)

2.5 验证

使用BulkLoad方式导入数据后,可以进入到HBase集群,使用HBase Shell来查看数据是否导入成功,预览结果如下:

3.总结

本篇博客为了演示实战效果,将生成HFile文件和使用BulkLoad方式导入HFile到HBase集群的步骤进行了分解,实际情况中,可以将这两个步骤合并为一个,实现自动化生成与HFile自动导入。如果在执行的过程中出现RpcRetryingCaller的异常,可以到对应RegionServer节点查看日志信息,这里面记录了出现这种异常的详细原因。

4.结束语

这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

相关 [hbase bulkload 数据] 推荐:

HBase 写优化之 BulkLoad 实现数据快速入库

- - 数据库 - ITeye博客
1、为何要 BulkLoad 导入. 传统的 HTableOutputFormat 写 HBase 有什么问题. 2、bulkload 流程与实践. 1、为何要 BulkLoad 导入. 传统的 HTableOutputFormat 写 HBase 有什么问题. 我们先看下 HBase 的写流程:.

HBase BulkLoad批量写入数据实战 - 哥不是小萝莉 - 博客园

- -
在进行数据传输中,批量加载数据到HBase集群有多种方式,比如通过HBase API进行批量写入数据、使用Sqoop工具批量导数到HBase集群、使用MapReduce批量导入等. 这些方式,在导入数据的过程中,如果数据量过大,可能耗时会比较严重或者占用HBase集群资源较多(如磁盘IO、HBase Handler数等).

HBASE数据架构

- - 数据库 - ITeye博客
关系数据库一般用B+树,HBASE用的是LSM树. MYSQL所用类B+树一般深度不超过3层,数据单独存放,在B+树的叶节点存储指向实际数据的指针,叶节点之间也相互关联,类似双向链表. 这种结构的特点是数据更新或写入导致数据页表分散,不利于顺序访问. LSM存储中,各个文件的结构类似于B+树,但是分多个存在内存或磁盘中,更新和写入变成了磁盘的顺序写,只在合并时去掉重复或过时的数据.

hbase写数据过程

- - 数据库 - ITeye博客
博文说明:1、研究版本hbase0.94.12;2、贴出的源代码可能会有删减,只保留关键的代码. 从client和server两个方面探讨hbase的写数据过程.     写数据主要是HTable的单条写和批量写两个API,源码如下:. hbase写数据的客户端核心方法是HConnectionManager的processBatchCallback方法,相关源码如下:.

HBase数据查询之Coprocessor

- - 开源软件 - ITeye博客
协处理器的概念、作用和类型不介绍,可以参看:http://www.cnblogs.com/ventlam/archive/2012/10/30/2747024.html,官方blog:https://blogs.apache.org/hbase/entry/coprocessor_introduction.

从hbase(hive)将数据导出到mysql

- - CSDN博客云计算推荐文章
在上一篇文章《 用sqoop进行mysql和hdfs系统间的数据互导》中,提到sqoop可以让RDBMS和HDFS之间互导数据,并且也支持从mysql中导入到HBase,但从HBase直接导入mysql则不是直接支持,而是间接支持. 要么将HBase导出到HDFS平面文件,要么将其导出到Hive中,再导出到mysql.

通过HBase Observer同步数据到ElasticSearch

- - SegmentFault 最新的文章
Observer希望解决的问题. HBase是一个分布式的存储体系,数据按照RowKey分成不同的Region,再分配给RegionServer管理. 但是RegionServer只承担了存储的功能,如果Region能拥有一部分的计算能力,从而实现一个HBase框架上的MapReduce,那HBase的操作性能将进一步提升.

HBase基本数据操作详解

- - 互联网 - ITeye博客
之前详细写了一篇HBase过滤器的文章,今天把基础的表和数据相关操作补上. 本文档 参考最新 (截止2014年7月16日)的 官方 Ref Guide、 Developer API编写. 所有代码均基于“hbase  0.96.2-hadoop2 ”版本编写,均实测通过. 对于建表,和RDBMS类似,HBase也有namespace的概念,可以指定表空间创建表,也可以直接创建表,进入default表空间.

JAVA操作HBASE数据操作详解

- -
Hbase对于建表,和RDBMS类似,HBase也有namespace的概念,可以指定表空间创建表,也可以直接创建表,进入default表空间. 对于数据操作,HBase支持四类主要的数据操作,分别是:. Put :增加一行,修改一行;. Delete :删除一行,删除指定列族,删除指定column的多个版本,删除指定column的制定版本等;.

两次hbase丢失数据的故障及原因分析

- gengmao - 蓝色时分
    hbase的稳定性是近期社区的重要关注点,毕竟稳定的系统才能被推广开来,这里有几次稳定性故障和大家分享.     第一次生产故障的现象及原因. 1 hbase发现无法写入 2 通过hbck检测发现.META.表中出现空洞,具体log是:;Chain of regions in table.     修复方法:直接使用check_meta.rb重新生成.META.表并修补空洞,但是会引起数据丢失.