hbase bulk load 小实践及一些总结

标签: hbase bulk load | 发表时间:2013-12-01 02:30 | 作者:u011689674
出处:http://blog.csdn.net

转载请注明出处: http://blog.csdn.net/lonelytrooper/article/details/17042391

很早就知道bulk load这个东西,也大致都清楚怎么回事,居然直到前几天才第一次实践...  害羞

这篇文章大致分为三个部分:

1. 使用hbase自带的importtsv工具

2. 自己实现写mr生成hfile并加载

3. bulk load本身及对依赖的第三方包的一些总结

第一部分:

导入的文件是data.txt,符合tsv格式,如下:


做一些准备工作:

a. 在hdfs上穿件/test目录,并将data.txt传至该目录下


b. 创建hbase表bl_tmp


c. 将依赖的jar加到$HADOOP_HOME/conf/hadoop-env.sh (每个人的不一定一样,加你需要的)


运行hbase自带的imprttsv工具,这里输出路径是output,列的定义由-Dimporttsv.columns指定:


程序正常运行,运行成功后,查看/output目录,output目录下会根据列族名生成一个自录,这里是d,d目录下为具体的hfile文件:


运行completebulkload工具将hfile装载到表bl_tmp中:


装载完之后,d目录下的hfile不存在了,这时查询bl_tmp表,如下:


第二部分:

源码直接贴了,简明扼要,没什么好说的... 关键的点详见前边两篇简要介绍相关源码的博文...

import java.io.IOException;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public final class HBaseBulkLoadDemo extends Configured implements Tool {

	public static class BulkLoadDemoMapper extends
			Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

		private static final byte[] FAMILY_NAME = "d".getBytes();
		private static final byte[] COLUMN_A = "colA".getBytes();
		private static final byte[] COLUMN_B = "colB".getBytes();
		private static final byte[] COLUMN_C = "colC".getBytes();

		protected void map(LongWritable key, Text value, Context context) throws IOException,
				InterruptedException {
			String line = value.toString();
			String[] fields = line.split("\t");
			byte[] rowkeybytes = Bytes.toBytes(fields[0]);
			ImmutableBytesWritable rowkey = new ImmutableBytesWritable(rowkeybytes);
			Put put = new Put(rowkeybytes);
			put.add(FAMILY_NAME, COLUMN_A, fields[1].getBytes());
			put.add(FAMILY_NAME, COLUMN_B, fields[2].getBytes());
			put.add(FAMILY_NAME, COLUMN_C, fields[3].getBytes());
			context.write(rowkey, put);
		}

	}

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new HBaseBulkLoadDemo(), args));
	}

	public int run(String[] args) throws Exception {
		Configuration conf = HBaseConfiguration.create();
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		if (otherArgs.length != 3) {
			System.err.println("Usage: <tableName> <inputDir> <outputDir>");
			System.exit(2);
		}
		HTable table = new HTable(conf, otherArgs[0]);
		Job job = new Job(conf);
		job.setJarByClass(HBaseBulkLoadDemo.class);
		job.setJobName("HBaseBulkLoadDemo " + new Date());
		job.setMapperClass(BulkLoadDemoMapper.class);
		job.setReducerClass(PutSortReducer.class);
		job.setMapOutputKeyClass(ImmutableBytesWritable.class);
		job.setMapOutputValueClass(Put.class);
		FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
		HFileOutputFormat.configureIncrementalLoad(job, table);
		return job.waitForCompletion(true) ? 0 : 1;
	}

}

程序用打包后,扔到集群上运行,为验证结果,注意先truncate掉bl_tmp表并删掉/output目录。

另外一点,这里运行自己打的包,如果你没有打依赖包的话,因为你用到hbase-version.jar,所以你需要把它加到HADOOP_CLASSPATH上:


运行自己打的jar包:


程序正确运行,查看/output下的输出:


将数据装在进bl_tmp仍然可以用completebulkload工具,或者你可以自己写一个工具,非常简单,就是构造一个LoadIncrementalHFile对象,并调用它的doBulkLoad方法就好了。 然后查看这时的bl_tmp(注意列名,与importtsv时不一样...):


第三部分:

关于bulkload本身:

1.这种方式适合初次导入,对于大数据量,效率非常可观,并且不需要表offline

2.目前貌似只适合每次对一个单列族导入..

3.数据量很大时,因为reduce个数与region个数对应,所以导数前记得对表进行预分区。

4.自己实现时,map阶段的输出只能是<ImmutableBytesWritable,KeyValue>或者<ImmutableBytesWritable,Put>,对应的reducer分别是KeyValueSortReducer和PutSortReducer。

关于hadoop对jar的加载方式及bulk load时第三方jar的一些说明,自己在实践的时候起初迷惑了很久,所以特意总结了下:

1.hadoop jar在运行时一定会将HADOOP_CLASSPATH加到CLASSPATH上(感兴趣可以cat hadoop看下),并且将hadoop jar运行的目标jar拷贝到子节点。
2.依赖的第三方jar,一般三种方式处理,要么-libjars,要么加到HADOOP_HOME/lib下(所有子节点),要么打包进目标jar。
3.运行hadop jar hbase-version.jar importtsv时,由于将依赖的jar加到了HADOOP_CLASSPATH,并且在主节点本地可以找到,所以依托TableMapReduceUtil.addDependencyJars方法的作用,依赖的第三方jar在运行时被作为分布式缓存拷贝到了子节点,程序得以正确运行。

完...  得意


作者:u011689674 发表于2013-11-30 18:30:33 原文链接
阅读:126 评论:0 查看评论

相关 [hbase bulk load] 推荐:

hbase bulk load 小实践及一些总结

- - CSDN博客互联网推荐文章
转载请注明出处: http://blog.csdn.net/lonelytrooper/article/details/17042391. 很早就知道bulk load这个东西,也大致都清楚怎么回事,居然直到前几天才第一次实践... . 这篇文章大致分为三个部分:. 使用hbase自带的importtsv工具.

DB2数据迁移之load

- - IT技术博客大学习
标签:   2数据迁移   DB   load.      一.load原理性知识.      1.为什么要使用LOAD.      load不需要写日志(或很少日志),不做检查约束和参照完整性约束,不触发Trigger,锁的时间比较短,因此特别适合大数据量的导入..      2.load过程分为4个阶段.

聊聊多线程程序的load balance

- - 搜索技术博客-淘宝
说起load balance,一般比较容易想到的是大型服务在多个replica之间的load balance、和kernal的load balance. 前者一般只是在流量入口做一下流量分配,逻辑相对简单;而后者则比较复杂,需要不断发现正在运行的各个进程之间的imbalance,然后通过将进程在CPU之间进行迁移,使得各个CPU都被充分利用起来.

Galera Load Balancer 0.9.0 正式版发布

- - 开源中国社区最新新闻
Galera Load Balancer 0.9.0 发布了,主要是引入 libglb.so ,可为你的 Linux 应用增加负载均衡能力,只需重载 libc 的 connect() 调用即可;同时提供可客户端到服务器端的直连,无处重新编译;增加了 round-robin 均衡策略. GLB (Galera Load Balancer) 是一个与 Pen 类似的 TCP 负载均衡器,它功能没有 Pen 那么强大,其主要的目的是做一个非常快速的 TCP 协议代理.

hibernate中get和load,find的区别

- - 企业架构 - ITeye博客
get和load方式是根据id取得一个记录. 下边详细说一下get和load的不同,因为有些时候为了对比也会把find加进来. 1.从返回结果上对比:. load方式检索不到的话会抛出org.hibernate.ObjectNotFoundException异常. get方法检索不到的话会返回null.

Hibernate中Session的get和load - 罗韬

- - 博客园_首页
hibernate中Session接口提供的get()和load()方法都是用来获取一个实体对象,在使用方式和查询性能上有一些区别. 测试版本:hibernate 4.2.0. Session接口提供了4个重载的get方法,分别通过“持久类+主键”和“全类名+主键”以及“锁选项”来获取实体对象. 向数据库发出一条sql查询语句,并返回结果.

hibernate中get和load方法的区别

- - 行业应用 - ITeye博客
    load方法检索不到数据抛出org.hibernate.ObjectNotFoundException异常,get方法检索不到数据返回null.     使用load方法,hibernate认为该ID对应的对象(数据库记录)在数据库中一定存在,在访问该对象非ID数据时,检索数据库,没有该对象记录,则抛出异常.

Linux - 系统指标 CPU load - 简书

- -
cpu load通常做为一个机器负载的衡量指标. cpu load是对使用或者等待cpu进程的统计(数量的累加). 每一个使用(using)或者等待(waiting)CPU的进程(process),都会使load值+1. 每一个结束的(teminates)进程,都会使load值-1. 所谓使用CPU的进程,是指状态为.

hbase介绍

- AreYouOK? - 淘宝数据平台与产品部官方博客 tbdata.org
hbase是bigtable的开源山寨版本. 是建立的hdfs之上,提供高可靠性、高性能、列存储、可伸缩、实时读写的数据库系统. 它介于nosql和RDBMS之间,仅能通过主键(row key)和主键的range来检索数据,仅支持单行事务(可通过hive支持来实现多表join等复杂操作). 主要用来存储非结构化和半结构化的松散数据.

Riak对比HBase

- - NoSQLFan
文章来自 Riak官方wiki,是一篇Riak与HBase的对比文章. Riak官方的对比通常都做得很中肯,并不刻意偏向自家产品. 对比的Riak版本是1.1.x,HBase是0.94.x. Riak 与 HBase 都是基于 Apache 2.0 licensed 发布. Riak 的实现是基于 Amazon 的 Dynamo 论文,HBase 是基于 Google 的 BigTable.