1000字范文,内容丰富有趣,学习的好帮手!
1000字范文 > Hbase的MapReduce(Hbase权威指南)+ BulkLoad导入HBase

Hbase的MapReduce(Hbase权威指南)+ BulkLoad导入HBase

时间:2024-05-15 07:54:05

相关推荐

Hbase的MapReduce(Hbase权威指南)+ BulkLoad导入HBase

目录

Hbase权威指南第7张与MapReduce集成

预测执行和版本介绍

Hbase与MapReduce集成

例7.1MapReduce作业从一个文件中读取数据并写入Hbase表

MapReduce生成HFile文件,再使用BulkLoad导入HBase中

Hbase权威指南第7张与MapReduce集成

预测执行和版本介绍

2.5.2.9. 预测执行 (Speculative Execution)

MapReduce任务的预测执行缺省是打开的,HBase集群一般建议在系统级关闭预测执行,除非在某种特殊情况下需要打开,此时可以每任务配置。设置mapred.map.tasks.speculative.execution和 mapred.reduce.tasks.speculative.execution为 false.

5.8.版本

一个{row, column, version}元组是HBase中的一个单元(cell).但是有可能会有很多的单元的行和列是相同的,可以使用版本来区分不同的单元.

rows和column key是用字节数组表示的,version则是用一个长整型表示。这个long的值使用java.util.Date.getTime()或者System.currentTimeMillis()产生的。这就意味着他的含义是“当前时间和1970-01-01 UTC的时间差,单位毫秒。”

Hbase与MapReduce集成

例7.1MapReduce作业从一个文件中读取数据并写入Hbase表

package com.gosun;import jdk.nashorn.internal.runtime.ParserException;import mons.cli.*;import mons.codec.digest.DigestUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.*;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.util.GenericOptionsParser;import java.io.IOException;public class ImportFromFile {public static final String NAME = "ImportFromFile"; //为后续的使用定义一个作业名public enum Counters {LINES}static class ImportMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { // 定义mapper类,继承自Hadoop已有的类private byte[] family = null;private byte[] qualifer = null;@Overridepublic void setup(Context context) throws IOException, InterruptedException{String column = context.getConfiguration().get("conf.column");byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));family = colkey[0];if (colkey.length > 1){qualifer = colkey[1];}}// map()函数将InputFormat提供的键值对转化为了OutputFormat需要的类型@Overridepublic void map(LongWritable offset, Text line, Context context) throws IOException, InterruptedException{String lineString = line.toString();byte[] rowkey = DigestUtils.md5(lineString);ImmutableBytesWritable rowKey = new ImmutableBytesWritable(Bytes.toBytes(lineString));Put put = new Put(rowkey);put.addColumn(family, qualifer, Bytes.toBytes(lineString));context.write(rowKey, put);}}// 使用Apache Commons CLI类解析命令行参数private static CommandLine parseArgs(String[] args) throws ParserException{Options options = new Options();Option o = new Option("t", "table", true, "table to import into (must exist)");o.setArgName("table-name");o.setRequired(true);options.addOption(o);o = new Option("c", "column", true, "column to store row data into (must exist)");o.setArgName("family:qualifier");o.setRequired(true);options.addOption(o);o = new Option("i", "input", true, "the directory or file to read from");o.setArgName("path-in-HDFS");o.setRequired(true);options.addOption(o);options.addOption("d", "debug", false, "switch on EDBUG log level");CommandLineParser parser = new PosixParser();CommandLine cmd = null;try{cmd = parser.parse(options, args);}catch (Exception e){System.err.println("ERROR: " + e.getMessage() + "\n");HelpFormatter formatter = new HelpFormatter();formatter.printHelp(NAME + " ", options, true);System.exit(-1);}return cmd;}public static void main(String[] args) throws Exception {Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.property.clientPort", "2181");conf.set("hbase.zookeeper.quorum", "www.migu-cdn-biz18.,www.migu-cdn-biz19.,www.migu-cdn-biz20.");conf.set("hbase.master", "www.migu-cdn-hadoop25.:60000");conf.set("hbase.client.keyvalue.maxsize","2097152000");String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();CommandLine cmd = parseArgs(otherArgs);String table = cmd.getOptionValue("t");String input = cmd.getOptionValue("i");String column = cmd.getOptionValue("c");conf.set("conf.column", column);Job job = new Job(conf, "Import from file " + input + " into table " + table); //使用特定的类定义作业job.setJarByClass(ImportFromFile.class);job.setMapperClass(ImportMapper.class);job.setOutputFormatClass(TableOutputFormat.class);job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);job.setOutputKeyClass(ImmutableBytesWritable.class);job.setNumReduceTasks(0); //这是一个只包含map阶段的作业,框架会直接跳过reduce阶段FileInputFormat.addInputPath(job, new Path(input));System.exit(job.waitForCompletion(true) ? 0 : 1);}}

ImportMapper类的重载的setup()方法只会在框架初始化改类时调用一次。

这个例子通过TableOutputFormat类使用了隐式的写缓冲区。调用context.write()方法时,改方法内部会传入给定的Put实例并调用tbale.Put()。在作业结束前,TableOutputFormat会主动调用flushCommints()以保存仍旧驻留在写缓冲区的数据。

最后,请注意作业在没有reduce的情况下,map阶段是怎样工作的,这是相当典型的Hbase与MapReduce作业结合:由于数据是存储在排序表中的,并且每行数据都拥有唯一的行健,用户可以在流程中避免更消耗的sort,shuffle和reduce阶段。

创建hbase表:

测试数据:

执行命令:

$JAVA_HOME/bin/java -classpath hbaseTest/merge-hbase-1.0-SNAPSHOT.jarcom.gosun.ImportFromFile-t testtable -i test-data.txt -c data:json

hbase表结果:

MapReduce生成HFile文件,再使用BulkLoad导入HBase中

介绍:

通常MapReduce在写HBase时使用的是TableOutputFormat方式,在reduce中直接生成put对象写入HBase,该方式在大数据量写入时效率低下(HBase会block写入,频繁进行flush,split,compact等大量IO操作),并对HBase节点的稳定性造成一定的影响(GC时间过长,响应变慢,导致节点超时退出,并引起一系列连锁反应),而HBase支持 bulk load 的入库方式,它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接在HDFS中生成持久化的HFile数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载,在大数据量写入时能极大的提高写入效率,并降低对HBase节点的写入压力。

代码:

BulkLoadToHBase.java

package com.gosun;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.KeyValue;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class BulkLoadToHBase {public static class ConvertWordCountOutToHFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>{@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// TODO Auto-generated method stubString wordCountStr=value.toString();String[] wordCountArray=wordCountStr.split("\t");String word=wordCountArray[0];//创建HBase中的RowKeybyte[] rowKey=Bytes.toBytes(word);ImmutableBytesWritable rowKeyWritable=new ImmutableBytesWritable(rowKey);byte[] family=Bytes.toBytes("data");byte[] qualifier=Bytes.toBytes("count");byte[] hbaseValue=Bytes.toBytes(word);// Put 用于列簇下的多列提交,若只有一个列,则可以使用 KeyValue 格式KeyValue keyValue = new KeyValue(rowKey, family, qualifier, hbaseValue);// Put put=new Put(rowKey);// put.add(family, qualifier, hbaseValue);context.write(rowKeyWritable, keyValue);}}public static void main(String[] args) throws Exception {Configuration hadoopConfiguration=new Configuration();hadoopConfiguration.set("fs.defaultFS", "hdfs://migumaster");hadoopConfiguration.set("dfs.nameservices", "migumaster");hadoopConfiguration.set("dfs.ha.namenodes.migumaster", "nn1,nn2");hadoopConfiguration.set("dfs.namenode.rpc-address.migumaster.nn1", "www.migu-cdn-biz18.:9000");hadoopConfiguration.set("dfs.namenode.rpc-address.migumaster.nn2", "www.migu-cdn-biz19.:9000");hadoopConfiguration.set("dfs.client.failover.proxy.provider.migumaster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");String[] dfsArgs = new GenericOptionsParser(hadoopConfiguration, args).getRemainingArgs();String htable = dfsArgs[0];// 写入的hbase表String inputPath = dfsArgs[1]; // 数据源路径String outPath = dfsArgs[2]; // HFile文件的路径// 如果存放 HFile文件的路径已经存在,就删除掉FileSystem fileSystem = FileSystem.get(hadoopConfiguration);if(fileSystem.exists(new Path(outPath))) {fileSystem.delete(new Path(outPath), true);}//只需要编写Mapper类,在Mapper类中对一个job的输出进行分析,并转换为HBase需要的KeyValue的方式。Job job=new Job(hadoopConfiguration, "wordCount_bulkload");job.setJarByClass(BulkLoadToHBase.class);job.setMapperClass(ConvertWordCountOutToHFileMapper.class);job.setMapOutputKeyClass(ImmutableBytesWritable.class);job.setMapOutputValueClass(KeyValue.class);FileInputFormat.addInputPath(job, new Path(inputPath));FileOutputFormat.setOutputPath(job, new Path(outPath));//创建HBase的配置对象Configuration hbaseConfiguration = HBaseConfiguration.create();hbaseConfiguration.set("hbase.zookeeper.property.clientPort", "2181");hbaseConfiguration.set("hbase.zookeeper.quorum", "www.migu-cdn-biz18.,www.migu-cdn-biz19.,www.migu-cdn-biz20.");hbaseConfiguration.set("hbase.master", "www.migu-cdn-hadoop25.:60000");hbaseConfiguration.set("hbase.client.keyvalue.maxsize","2097152000");//创建目标表对象Connection HbaseConn = ConnectionFactory.createConnection(hbaseConfiguration);HTable table = (HTable) HbaseConn.getTable(TableName.valueOf(htable));HFileOutputFormat2.configureIncrementalLoad(job, table);//提交jobint convertWordCountJobOutputToHFileJobResult=job.waitForCompletion(true)?0:1;//当job结束之后,调用BulkLoad方式来将MR结果批量入库LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseConfiguration);//第一个参数为第二个Job的输出目录即保存HFile的目录,第二个参数为目标表loader.doBulkLoad(new Path(outPath), table);System.exit(convertWordCountJobOutputToHFileJobResult);}}

执行命令:

$JAVA_HOME/bin/java $JAVA_OPTS -classpath $CLASSPATH $APP_MAIN testtable hdfs://migumaster/tmp/test-data.txt hdfs://migumaster/tmp/tmp1/

hbase创建表:

测试数据:

执行显示:

参考: hbase权威指南

/m0_37739193/article/details/78781579

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。