将文件通过bulkload直接插入HBase表格中
思路 :将文件转换成Hfile格式,再直接加载至HBase中;将文件直接转换成HFile格式将上一步输出路径下面的HFile文件,加载到hbase表当中去
思路 :将文件转换成Hfile格式,再直接加载至HBase中;
将文件直接转换成HFile格式
import HbaseWR.HbaseReadWrite;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class hdfs_hfile extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Connection connection = ConnectionFactory.createConnection(super.getConf());
Table table = connection.getTable(TableName.valueOf("myuser2"));
Job job = Job.getInstance(super.getConf(), "bulkload");
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/input"));
job.setMapperClass(mapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
//导出的 不再是Text格式的文件了,是Hfile格式的,所以要用HFileOutputFormat2.class
job.setOutputFormatClass(HFileOutputFormat2.class);
//需要设定HFileOutputFormat2的参数。不然会出现报警(java.lang.Exception: java.lang.NullPointerException)
//table可以通过ConnectionFactory获取
HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf("myuser2")));
HFileOutputFormat2.setOutputPath(job, new Path("hdfs://node01:8020/output"));
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");
int run = ToolRunner.run(configuration, new hdfs_hfile(), args);
System.exit(run);
}
static class mapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable();
immutableBytesWritable.set(Bytes.toBytes(split[0]));
Put put = new Put(Bytes.toBytes(split[0]));
put.addColumn("f1".getBytes(), "name".getBytes(), split[1].getBytes());
put.addColumn("f1".getBytes(), "age".getBytes(), split[2].getBytes());
context.write(immutableBytesWritable, put);
}
}
}
将上一步输出路径下面的HFile文件,加载到hbase表当中去
import javafx.scene.control.Tab;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
public class LOAD {
public static void main(String[] args) throws Exception {
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");
Connection connection = ConnectionFactory.createConnection(configuration);
Admin admin = connection.getAdmin();
Table table = connection.getTable(TableName.valueOf("myuser3"));
LoadIncrementalHFiles load = new LoadIncrementalHFiles(configuration);
load.doBulkLoad(new Path("hdfs://node01:8020/output"), admin, table, connection.getRegionLocator(TableName.valueOf("myuser3")));
}
}