Hbase 多张表数据合并

    技术2022-07-29  85

    1、Spark编码实现:spark读hbase基于TableInputFormat设置查询条件,获取需求数据,在写入hbase
    bulk load方式scala编码实现:
    package com.cbp.hbaseTableMerge import org.apache.hadoop.fs.Path import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName} import org.apache.hadoop.hbase.client.{ConnectionFactory, Result} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, TableInputFormat} import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapreduce.Job import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SparkSession import org.apache.spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer object HbaseMergeBulk2 { //设置日志级别 Logger.getLogger("org").setLevel(Level.INFO) def main(args: Array[String]): Unit = { //外部传参,读表、写表、开始row、结束row、临时文件路径、重分区数、列族 val readTable = args(0) val writeTable = args(1) val startTime = args(2) val stopTime = args(3) val filePath = args(4) val partition = args(5) val columnf = args(6) //创建spark sql入口类SparkSession val ss = SparkSession.builder().getOrCreate() //设置hbase配置信息 val hbaseConf = HBaseConfiguration.create() hbaseConf.set(TableInputFormat.INPUT_TABLE, readTable) hbaseConf.set(TableInputFormat.SCAN_TIMERANGE_START, startTime) hbaseConf.set(TableInputFormat.SCAN_TIMERANGE_END, stopTime) hbaseConf.set("hbase.mapreduce.hfileoutputformat.table.name", writeTable) hbaseConf.setInt("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", 5000) //获取hbase连接,获取表名、region信息 val conn = ConnectionFactory.createConnection(hbaseConf) val regionLocator = conn.getRegionLocator(TableName.valueOf(writeTable)) val table = conn.getTable(TableName.valueOf(writeTable)) //配置job信息 val job = Job.getInstance(hbaseConf) job.setMapOutputKeyClass(classOf[ImmutableBytesWritable]) job.setMapOutputValueClass(classOf[KeyValue]) HFileOutputFormat2.configureIncrementalLoadMap(job, table.getDescriptor) //读取hbase数据并处理 //阶段1 val rdd = ss.sparkContext.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) .repartition(Integer.valueOf(partition)) .persist(StorageLevel.MEMORY_AND_DISK_SER) //阶段2 val rdd1 = rdd.flatMap(t => { val values = ArrayBuffer[(String, (String, String, String))]() val cols = ArrayBuffer[String]() val colsMap = t._2.getFamilyMap(Bytes.toBytes(columnf)) import scala.collection.JavaConversions._ for (entry <- colsMap.entrySet()) { cols.append(Bytes.toString(entry.getKey)) } cols.foreach(col => { values.append((Bytes.toString(t._2.getRow), (columnf, col, Bytes.toString(t._2.getValue(Bytes.toBytes(columnf), Bytes.toBytes(col)))))) }) values }).persist(StorageLevel.MEMORY_AND_DISK_SER) rdd.unpersist() //阶段3 rdd1.sortBy(x => (x._1, x._2._1, x._2._2)) .map(rdd => { val rowKey = Bytes.toBytes(rdd._1) val family = Bytes.toBytes(rdd._2._1) val colum = Bytes.toBytes(rdd._2._2) val value = Bytes.toBytes(rdd._2._3) (new ImmutableBytesWritable(rowKey), new KeyValue(rowKey, family, colum, value)) }).saveAsNewAPIHadoopFile(filePath, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], hbaseConf) //创建bulk load 对象并加载 val load = new LoadIncrementalHFiles(hbaseConf) load.doBulkLoad(new Path(filePath), conn.getAdmin, table, regionLocator) table.close() conn.close() ss.close() } }
    put方式scala编码实现:
    package com.cbp.hbaseTableMerge import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{Put, Result} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapreduce.Job import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SparkSession import org.apache.spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer /** * 使用put方式 * 弊端:数据量大,没有预分区,写入过程会造成 region拒绝写入(原因请百度),小批量数据ok * */ object HbaseMergePut { //设置日志级别 Logger.getLogger("org").setLevel(Level.INFO) def main(args: Array[String]): Unit = { //外部传参,读表、写表、开始row、结束row、临时文件路径、重分区数、列族 val readTable = args(0) val writeTable = args(1) val startRow = args(2) val stopRow = args(3) val partition = args(4) val columnf = args(5) //创建spark sql入口类SparkSession val ss = SparkSession.builder().getOrCreate() //设置hbase配置信息 val hbaseConf = HBaseConfiguration.create() hbaseConf.set(TableInputFormat.INPUT_TABLE, readTable) hbaseConf.set(TableInputFormat.SCAN_ROW_START, startRow) hbaseConf.set(TableInputFormat.SCAN_ROW_STOP, stopRow) hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, writeTable) //配置job信息 val job = Job.getInstance(hbaseConf) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) job.setOutputValueClass(classOf[Put]) //读取hbase数据,处理,写入 //阶段1 val rdd = ss.sparkContext.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) .repartition(Integer.valueOf(partition)) .persist(StorageLevel.MEMORY_AND_DISK_SER) //阶段2 rdd.map(t => { val put = new Put(t._2.getRow) val cols = ArrayBuffer[String]() val colsMap = t._2.getFamilyMap(Bytes.toBytes(columnf)) import scala.collection.JavaConversions._ for (entry <- colsMap.entrySet()) { cols.append(Bytes.toString(entry.getKey)) } cols.foreach(col => { put.addColumn(Bytes.toBytes(columnf), Bytes.toBytes(col), t._2.getValue(Bytes.toBytes(columnf), Bytes.toBytes(col))) }) (new ImmutableBytesWritable, put) }).saveAsNewAPIHadoopDataset(job.getConfiguration) } }
    submit提交脚本:
    nohup spark-submit \ --master yarn \ --deploy-mode client \ --class com.cbp.hbaseTableMerge.HbaseMergeBulk2 \ --driver-memory 2G \ --executor-memory 18G \ --executor-cores 4 \ --num-executors 20 \ --conf spark.default.parallelism=240 \ --conf spark.speculation=true \ --conf spark.speculation.interval=100 \ --conf spark.speculation.quantile=0.75 \ --conf spark.speculation.multiplier=1.5 \ --conf spark.storage.memoryFraction=0.3 \ --conf spark.shuffle.memoryFraction=0.3 \ --conf spark.shuffle.service.enabled=true \ com.cbp.test-1.0-SNAPSHOT.jar \ "nbdpt:fp_xx_jbxx" \ "nbdpt:fp_xx_jbxx_full" \ "1592841600000" \ "1593316959508" \ "./tmp/row3" \ "50" \ "fpxx" \ > ./log.file 2>&1 &
    2、Hbase工具类方式:
    //HBase表数据导出: hbase org.apache.hadoop.hbase.mapreduce.Export -Dhbase.export.scanner.batch=2000 指定批次 -D mapred.output.compress=true 开启压缩 test 指定表名 /hbase/test 指定导出路径(添加file://就是导出到本地文件) //hdfs数据导入HBase表: hbase org.apache.hadoop.hbase.mapreduce.Driver import \ -Dimport.bulk.output=./test/outPut \ -Dmapreduce.map.speculative=true \ -Dmapreduce.reduce.speculative=true \ test \ /hbase/test/* (指定导入路径这里是hdfs路径)
    Processed: 0.009, SQL: 9