1000字范文,内容丰富有趣,学习的好帮手!
1000字范文 > HBase快速导入数据–BulkLoad

HBase快速导入数据–BulkLoad

时间:2023-03-11 18:11:13

相关推荐

HBase快速导入数据–BulkLoad

数据库|mysql教程

HBase导入数据,BulkLoad,HBase快速导入数据

数据库-mysql教程

一文鸡源码开发,运行vscode需要网络么,htpc选择Ubuntu,上次图片到tomcat,爬虫软件平台,php算术验证码,新闻seo系统哪家服务好,美食网站jsp,淘宝 页面模板lzw

Apache HBase是一个分布式的、面向列的开源数据库,它可以让我们随机的、实时的访问大数据。但是怎样有效的将数据导入到HBase呢?

影视源码 三级分销,ubuntu系统虚拟镜像,tomcat文件夹的含义,插件式爬虫软件,php网站外包收费标准学习,SEO攻略杨帆lzw

vb制作定时关机源码,vscode 禁止自动转码,ubuntu 安装 分去,tomcat 网站日志,sqlite外键约束的用法,南京万德爬虫岗位怎么样,php 代码发布系统,舟山seo推广价格优惠,discuz汽车门户网站,.net 注册页面模板lzw

Apache HBase是一个分布式的、面向列的开源数据库,它可以让我们随机的、实时的访问大数据。但是怎样有效的将数据导入到HBase呢?HBase有多种导入数据的方法,最直接的方法就是在MapReduce作业中使用TableOutputFormat作为输出,或者使用标准的客户端API,但是这些都不是非常有效的方法。

Bulkload利用MapReduce作业输出HBase内部数据格式的表数据,然后将生成的StoreFiles直接导入到集群中。与使用HBase API相比,使用Bulkload导入数据占用更少的CPU和网络资源。

Bulkload过程主要包括三部分:

1.从数据源(通常是文本文件或其他的数据库)提取数据并上传到HDFS

这一步不在HBase的考虑范围内,不管数据源是什么,,只要在进行下一步之前将数据上传到HDFS即可。

2.利用一个MapReduce作业准备数据

这一步需要一个MapReduce作业,并且大多数情况下还需要我们自己编写Map函数,而Reduce函数不需要我们考虑,由HBase提供。该作业需要使用rowkey(行键)作为输出Key,KeyValue、Put或者Delete作为输出Value。MapReduce作业需要使用HFileOutputFormat2来生成HBase数据文件。为了有效的导入数据,需要配置HFileOutputFormat2使得每一个输出文件都在一个合适的区域中。为了达到这个目的,MapReduce作业会使用Hadoop的TotalOrderPartitioner类根据表的key值将输出分割开来。HFileOutputFormat2的方法configureIncrementalLoad()会自动的完成上面的工作。

3.告诉RegionServers数据的位置并导入数据

这一步是最简单的,通常需要使用LoadIncrementalHFiles(更为人所熟知是completebulkload工具),将文件在HDFS上的位置传递给它,它就会利用RegionServer将数据导入到相应的区域。

下图简单明确的说明了整个过程

图片来自How-to: Use HBase Bulk Loading, and Why

Note:在进行BulkLoad之前,要在HBase中创建与程序中同名且结构相同的空表

Java实现如下:

BulkLoadDriver.java

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.TableName;

import org.apache.hadoop.hbase.client.Connection;

import org.apache.hadoop.hbase.client.ConnectionFactory;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

/**

* Created by shaobo on 15-6-9.

*/

public class BulkLoadDriver extends Configured implements Tool {

private static final String DATA_SEPERATOR = “\\s+”;

private static final String TABLE_NAME = “temperature”;//表名

private static final String COLUMN_FAMILY_1=”date”;//列组1

private static final String COLUMN_FAMILY_2=”tempPerHour”;//列组2

public static void main(String[] args) {

try {

int response = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadDriver(), args);

if(response == 0) {

System.out.println(“Job is successfully completed…”);

} else {

System.out.println(“Job failed…”);

}

} catch(Exception exception) {

exception.printStackTrace();

}

}

public int run(String[] args) throws Exception {

String outputPath = args[1];

/**

* 设置作业参数

*/

Configuration configuration = getConf();

configuration.set(“data.seperator”, DATA_SEPERATOR);

configuration.set(“hbase.table.name”, TABLE_NAME);

configuration.set(“COLUMN_FAMILY_1”, COLUMN_FAMILY_1);

configuration.set(“COLUMN_FAMILY_2”, COLUMN_FAMILY_2);

Job job = Job.getInstance(configuration, “Bulk Loading HBase Table::” + TABLE_NAME);

job.setJarByClass(BulkLoadDriver.class);

job.setInputFormatClass(TextInputFormat.class);

job.setMapOutputKeyClass(ImmutableBytesWritable.class);//指定输出键类

job.setMapOutputValueClass(Put.class);//指定输出值类

job.setMapperClass(BulkLoadMapper.class);//指定Map函数

FileInputFormat.addInputPaths(job, args[0]);//输入路径

FileSystem fs = FileSystem.get(configuration);

Path output = new Path(outputPath);

if (fs.exists(output)) {

fs.delete(output, true);//如果输出路径存在,就将其删除

}

FileOutputFormat.setOutputPath(job, output);//输出路径

Connection connection = ConnectionFactory.createConnection(configuration);

TableName tableName = TableName.valueOf(TABLE_NAME);

HFileOutputFormat2.configureIncrementalLoad(job, connection.getTable(tableName), connection.getRegionLocator(tableName));

job.waitForCompletion(true);

if (job.isSuccessful()){

HFileLoader.doBulkLoad(outputPath, TABLE_NAME);//导入数据

return 0;

} else {

return 1;

}

}

}

BulkLoadMapper.java

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。