Hbase调用JavaAPI实现批量导入操作

标签: hbase javaapi | 发表时间:2014-10-14 14:33 | 作者:foxtosky
分享到:
出处:http://blog.csdn.net

将手机上网日志文件批量导入到Hbase中,操作步骤:

1、将日志文件(请下载附件)上传到HDFS中,利用hadoop的操作命令上传:hadoop  fs -put input  /


 

2、创建Hbase表,通过Java操作

 

Java代码   收藏代码
  1. package com.jiewen.hbase;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.conf.Configuration;  
  6. import org.apache.hadoop.hbase.HBaseConfiguration;  
  7. import org.apache.hadoop.hbase.HColumnDescriptor;  
  8. import org.apache.hadoop.hbase.HTableDescriptor;  
  9. import org.apache.hadoop.hbase.client.Get;  
  10. import org.apache.hadoop.hbase.client.HBaseAdmin;  
  11. import org.apache.hadoop.hbase.client.HTable;  
  12. import org.apache.hadoop.hbase.client.Put;  
  13. import org.apache.hadoop.hbase.client.Result;  
  14. import org.apache.hadoop.hbase.client.ResultScanner;  
  15. import org.apache.hadoop.hbase.client.Scan;  
  16. import org.apache.hadoop.hbase.util.Bytes;  
  17.   
  18. public class HbaseDemo {  
  19.   
  20.     public static void main(String[] args) throws IOException {  
  21.         String tableName = "wlan_log";  
  22.         String columnFamily = "cf";  
  23.   
  24.         HbaseDemo.create(tableName, columnFamily);  
  25.   
  26.         // HbaseDemo.put(tableName, "row1", columnFamily, "cl1", "data");  
  27.         // HbaseDemo.get(tableName, "row1");  
  28.         // HbaseDemo.scan(tableName);  
  29.         // HbaseDemo.delete(tableName);  
  30.     }  
  31.   
  32.     // hbase操作必备  
  33.     private static Configuration getConfiguration() {  
  34.         Configuration conf = HBaseConfiguration.create();  
  35.         conf.set("hbase.rootdir", "hdfs://hadoop1:9000/hbase");  
  36.         // 使用eclipse时必须添加这个,否则无法定位  
  37.         conf.set("hbase.zookeeper.quorum", "hadoop1");  
  38.         return conf;  
  39.     }  
  40.   
  41.     // 创建一张表  
  42.     public static void create(String tableName, String columnFamily)  
  43.             throws IOException {  
  44.         HBaseAdmin admin = new HBaseAdmin(getConfiguration());  
  45.         if (admin.tableExists(tableName)) {  
  46.             System.out.println("table exists!");  
  47.         } else {  
  48.             HTableDescriptor tableDesc = new HTableDescriptor(tableName);  
  49.             tableDesc.addFamily(new HColumnDescriptor(columnFamily));  
  50.             admin.createTable(tableDesc);  
  51.             System.out.println("create table success!");  
  52.         }  
  53.     }  
  54.   
  55.     // 添加一条记录  
  56.     public static void put(String tableName, String row, String columnFamily,  
  57.             String column, String data) throws IOException {  
  58.         HTable table = new HTable(getConfiguration(), tableName);  
  59.         Put p1 = new Put(Bytes.toBytes(row));  
  60.         p1.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes  
  61.                 .toBytes(data));  
  62.         table.put(p1);  
  63.         System.out.println("put'" + row + "'," + columnFamily + ":" + column  
  64.                 + "','" + data + "'");  
  65.     }  
  66.   
  67.     // 读取一条记录  
  68.     public static void get(String tableName, String row) throws IOException {  
  69.         HTable table = new HTable(getConfiguration(), tableName);  
  70.         Get get = new Get(Bytes.toBytes(row));  
  71.         Result result = table.get(get);  
  72.         System.out.println("Get: " + result);  
  73.     }  
  74.   
  75.     // 显示所有数据  
  76.     public static void scan(String tableName) throws IOException {  
  77.         HTable table = new HTable(getConfiguration(), tableName);  
  78.         Scan scan = new Scan();  
  79.         ResultScanner scanner = table.getScanner(scan);  
  80.         for (Result result : scanner) {  
  81.             System.out.println("Scan: " + result);  
  82.         }  
  83.     }  
  84.   
  85.     // 删除表  
  86.     public static void delete(String tableName) throws IOException {  
  87.         HBaseAdmin admin = new HBaseAdmin(getConfiguration());  
  88.         if (admin.tableExists(tableName)) {  
  89.             try {  
  90.                 admin.disableTable(tableName);  
  91.                 admin.deleteTable(tableName);  
  92.             } catch (IOException e) {  
  93.                 e.printStackTrace();  
  94.                 System.out.println("Delete " + tableName + " 失败");  
  95.             }  
  96.         }  
  97.         System.out.println("Delete " + tableName + " 成功");  
  98.     }  
  99.   
  100. }  

 

3、将日志文件导入Hbase表wlan_log中:

 

Java代码   收藏代码
  1. import java.text.SimpleDateFormat;  
  2. import java.util.Date;  
  3.   
  4. import org.apache.hadoop.conf.Configuration;  
  5. import org.apache.hadoop.hbase.client.Put;  
  6. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;  
  7. import org.apache.hadoop.hbase.mapreduce.TableReducer;  
  8. import org.apache.hadoop.hbase.util.Bytes;  
  9. import org.apache.hadoop.io.LongWritable;  
  10. import org.apache.hadoop.io.NullWritable;  
  11. import org.apache.hadoop.io.Text;  
  12. import org.apache.hadoop.mapreduce.Counter;  
  13. import org.apache.hadoop.mapreduce.Job;  
  14. import org.apache.hadoop.mapreduce.Mapper;  
  15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  16. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  17.   
  18. public class HbaseBatchImport {  
  19.   
  20.     public static void main(String[] args) throws Exception {  
  21.         final Configuration configuration = new Configuration();  
  22.         // 设置zookeeper  
  23.         configuration.set("hbase.zookeeper.quorum", "hadoop1");  
  24.   
  25.         // 设置hbase表名称  
  26.         configuration.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");  
  27.   
  28.         // 将该值改大,防止hbase超时退出  
  29.         configuration.set("dfs.socket.timeout", "180000");  
  30.   
  31.         final Job job = new Job(configuration, "HBaseBatchImport");  
  32.   
  33.         job.setMapperClass(BatchImportMapper.class);  
  34.         job.setReducerClass(BatchImportReducer.class);  
  35.         // 设置map的输出,不设置reduce的输出类型  
  36.         job.setMapOutputKeyClass(LongWritable.class);  
  37.         job.setMapOutputValueClass(Text.class);  
  38.   
  39.         job.setInputFormatClass(TextInputFormat.class);  
  40.         // 不再设置输出路径,而是设置输出格式类型  
  41.         job.setOutputFormatClass(TableOutputFormat.class);  
  42.   
  43.         FileInputFormat.setInputPaths(job, "hdfs://hadoop1:9000/input");  
  44.   
  45.         job.waitForCompletion(true);  
  46.     }  
  47.   
  48.     static class BatchImportMapper extends  
  49.             Mapper<LongWritable, Text, LongWritable, Text> {  
  50.         SimpleDateFormat dateformat1 = new SimpleDateFormat("yyyyMMddHHmmss");  
  51.         Text v2 = new Text();  
  52.   
  53.         protected void map(LongWritable key, Text value, Context context)  
  54.                 throws java.io.IOException, InterruptedException {  
  55.             final String[] splited = value.toString().split("\t");  
  56.             try {  
  57.                 final Date date = new Date(Long.parseLong(splited[0].trim()));  
  58.                 final String dateFormat = dateformat1.format(date);  
  59.                 String rowKey = splited[1] + ":" + dateFormat;  
  60.                 v2.set(rowKey + "\t" + value.toString());  
  61.                 context.write(key, v2);  
  62.             } catch (NumberFormatException e) {  
  63.                 final Counter counter = context.getCounter("BatchImport",  
  64.                         "ErrorFormat");  
  65.                 counter.increment(1L);  
  66.                 System.out.println("出错了" + splited[0] + " " + e.getMessage());  
  67.             }  
  68.         };  
  69.     }  
  70.   
  71.     static class BatchImportReducer extends  
  72.             TableReducer<LongWritable, Text, NullWritable> {  
  73.         protected void reduce(LongWritable key,  
  74.                 java.lang.Iterable<Text> values, Context context)  
  75.                 throws java.io.IOException, InterruptedException {  
  76.             for (Text text : values) {  
  77.                 final String[] splited = text.toString().split("\t");  
  78.   
  79.                 final Put put = new Put(Bytes.toBytes(splited[0]));  
  80.                 put.add(Bytes.toBytes("cf"), Bytes.toBytes("date"), Bytes  
  81.                         .toBytes(splited[1]));  
  82.                 // 省略其他字段,调用put.add(....)即可  
  83.                 context.write(NullWritable.get(), put);  
  84.             }  
  85.         };  
  86.     }  
  87.   
  88. }  

 4、查看导入结果:



 

作者:foxtosky 发表于2014-10-14 14:33:35 原文链接
阅读:137 评论:0 查看评论

相关 [hbase javaapi] 推荐:

Hbase调用JavaAPI实现批量导入操作

- - CSDN博客云计算推荐文章
将手机上网日志文件批量导入到Hbase中,操作步骤:. 1、将日志文件(请下载附件)上传到HDFS中,利用hadoop的操作命令上传:hadoop  fs -put input  /. 2、创建Hbase表,通过Java操作.     // hbase操作必备  .         // 使用eclipse时必须添加这个,否则无法定位  .

elasticsearch的javaAPI之query

- - CSDN博客云计算推荐文章
elasticsearch的javaAPI之query API. the Search API允许执行一个搜索查询,返回一个与查询匹配的结果(hits). 它可以在跨一个或多个index上执行, 或者一个或多个types. 查询可以使用提供的 query Java API 或filter Java API.

hbase介绍

- AreYouOK? - 淘宝数据平台与产品部官方博客 tbdata.org
hbase是bigtable的开源山寨版本. 是建立的hdfs之上,提供高可靠性、高性能、列存储、可伸缩、实时读写的数据库系统. 它介于nosql和RDBMS之间,仅能通过主键(row key)和主键的range来检索数据,仅支持单行事务(可通过hive支持来实现多表join等复杂操作). 主要用来存储非结构化和半结构化的松散数据.

Riak对比HBase

- - NoSQLFan
文章来自 Riak官方wiki,是一篇Riak与HBase的对比文章. Riak官方的对比通常都做得很中肯,并不刻意偏向自家产品. 对比的Riak版本是1.1.x,HBase是0.94.x. Riak 与 HBase 都是基于 Apache 2.0 licensed 发布. Riak 的实现是基于 Amazon 的 Dynamo 论文,HBase 是基于 Google 的 BigTable.

[转]HBase简介

- - 小鸥的博客
   Hbase是一个分布式开源数据库,基于Hadoop分布式文件系统,模仿并提供了基于Google文件系统的Bigtable数据库的所有功能. 其目标是处理非常庞大的表,可以用普通的计算机处理超过10亿行数据,并且有数百万列元素组成的数据表. Hbase可以直接使用本地文件系统或者Hadoop作为数据存储方式,不过为了提高数据可靠性和系统的健壮性,发挥Hbase处理大数据量等功能,需要使用Hadoop作为文件系统.

HBase Memstore配置

- - 行业应用 - ITeye博客
HBase Memstore配置. 本文为翻译,原英文地址:http://blog.sematext.com/2012/07/16/hbase-memstore-what-you-should-know/.     当regionserver(以下简称RS)收到一个写请求,会将这个请求定位到某个特定的region.

HBase表设计

- - 互联网 - ITeye博客
默认情况下,在创建HBase表的时候会自动创建一个region分区,当导入数据的时候,所有的HBase客户端都向这一个region写数据, 直到这 个region足够大了才进行切分. 一种可以加快批量写入速度的方法是通过预先创建一些空的regions,这样当数据写入HBase时,会按 照 region分区情况,在集群内做数据的负载均衡.

hbase锁机制

- - 数据库 - ITeye博客
博文说明:1、研究版本hbase0.94.12;2、贴出的源代码可能会有删减,只保留关键的代码.   hbase的锁是采用jdk的ReentrantReadWriteLock类实现.   一、HRegion有两种锁:lock、updatesLock,这两种锁均是ReentrantReadWriteLock类的实例,基本上所有的region操作均需要获取lock的read共享锁,在获取了lock的read锁后,如果是增加或者删除等影响数据内容的操作则还需要获取updatesLock的read锁.

Hbase入门

- - CSDN博客云计算推荐文章
Hbase 全称是Hadoop DataBase ,是一种开源的,可伸缩的,高可靠,高性能,面向列的分布式存储系统. 类似于Google的BigTable,其分布式计算采用MapReduce,通过MapReduce完成大块数据加载和全表扫描操作. 文件存储系统是HDFS,通过Zookeeper来完成状态管理协同服务.

hbase原理

- - CSDN博客云计算推荐文章
1.hbase利用hdfs作为其文件存储系统,利用mapreduce来处理数据,利用zookeeper作为协调工具. 2.行键(row key),类似于主键,但row key是表自带的. 3.列族(column family) ,列(也称作标签/修饰符)的集合,定义表的时候指定的,列是在插入记录的时候动态增加的.