将文件通过bulkload直接插入HBase表格中,节省region资源

    技术2022-07-11  94

    将文件通过bulkload直接插入HBase表格中

    思路 :将文件转换成Hfile格式,再直接加载至HBase中;将文件直接转换成HFile格式将上一步输出路径下面的HFile文件,加载到hbase表当中去

    思路 :将文件转换成Hfile格式,再直接加载至HBase中;

    将文件直接转换成HFile格式

    import HbaseWR.HbaseReadWrite; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; public class hdfs_hfile extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Connection connection = ConnectionFactory.createConnection(super.getConf()); Table table = connection.getTable(TableName.valueOf("myuser2")); Job job = Job.getInstance(super.getConf(), "bulkload"); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/input")); job.setMapperClass(mapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); //导出的 不再是Text格式的文件了,是Hfile格式的,所以要用HFileOutputFormat2.class job.setOutputFormatClass(HFileOutputFormat2.class); //需要设定HFileOutputFormat2的参数。不然会出现报警(java.lang.Exception: java.lang.NullPointerException) //table可以通过ConnectionFactory获取 HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf("myuser2"))); HFileOutputFormat2.setOutputPath(job, new Path("hdfs://node01:8020/output")); boolean b = job.waitForCompletion(true); return b ? 0 : 1; } public static void main(String[] args) throws Exception { Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181"); int run = ToolRunner.run(configuration, new hdfs_hfile(), args); System.exit(run); } static class mapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(); immutableBytesWritable.set(Bytes.toBytes(split[0])); Put put = new Put(Bytes.toBytes(split[0])); put.addColumn("f1".getBytes(), "name".getBytes(), split[1].getBytes()); put.addColumn("f1".getBytes(), "age".getBytes(), split[2].getBytes()); context.write(immutableBytesWritable, put); } } }

    将上一步输出路径下面的HFile文件,加载到hbase表当中去

    import javafx.scene.control.Tab; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; public class LOAD { public static void main(String[] args) throws Exception { Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181"); Connection connection = ConnectionFactory.createConnection(configuration); Admin admin = connection.getAdmin(); Table table = connection.getTable(TableName.valueOf("myuser3")); LoadIncrementalHFiles load = new LoadIncrementalHFiles(configuration); load.doBulkLoad(new Path("hdfs://node01:8020/output"), admin, table, connection.getRegionLocator(TableName.valueOf("myuser3"))); } }
    Processed: 0.014, SQL: 9