Spark读取csv和parquet文件将数据写入Hbase表

    技术2022-07-17  83

    1、Saprk读取csv文件将数据写入Hbase表中编码实现。
    scala编码:
    package com.cbp.spark_hbase import org.apache.hadoop.fs.Path import org.apache.hadoop.hbase.client.ConnectionFactory import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName} import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2 import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapreduce.Job import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.types.StructField import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer object SparkReadCsvOrParquetToHbase { Logger.getLogger("org").setLevel(Level.INFO) def main(args: Array[String]): Unit = { val tableName = args(0) val readPath =args(1) val filePath = args(2) val columnf ="fpmx" val ss = SparkSession.builder().getOrCreate() val hconf = HBaseConfiguration.create() hconf.set("hbase.mapreduce.hfileoutputformat.table.name", tableName) hconf.setInt("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", 5000) val conn = ConnectionFactory.createConnection(hconf) val regionLocator = conn.getRegionLocator(TableName.valueOf(tableName)) val table = conn.getTable(TableName.valueOf(tableName)) val job = Job.getInstance(hconf) job.setMapOutputKeyClass(classOf[ImmutableBytesWritable]) job.setMapOutputValueClass(classOf[KeyValue]) HFileOutputFormat2.configureIncrementalLoadMap(job, table.getDescriptor) val df: DataFrame = ss.read.csv(readPath) // val df: DataFrame = ss.read.parquet(readPath) val rdd1 = df.rdd.flatMap(row => { val fields: Array[StructField] = row.schema.fields val values = ArrayBuffer[(String, (String, String, String))]() val rowkey = row.getAs(0).toString fields.foreach(col => { values.append((rowkey, (columnf,col.name,row.getAs(col.name)))) }) values }).persist(StorageLevel.MEMORY_AND_DISK_SER) rdd1.sortBy(x => (x._1, x._2._1, x._2._2)) .map(rdd => { val rowKey = Bytes.toBytes(rdd._1) val family = Bytes.toBytes(rdd._2._1) val colum = Bytes.toBytes(rdd._2._2) val value = Bytes.toBytes(rdd._2._3) (new ImmutableBytesWritable(rowKey), new KeyValue(rowKey, family, colum, value)) }).saveAsNewAPIHadoopFile(filePath, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], hconf) val load = new LoadIncrementalHFiles(hconf) load.doBulkLoad(new Path(filePath), conn.getAdmin, table, regionLocator) table.close() conn.close() ss.close() } }
    submit提交脚本:
    nohup spark-submit \ --master yarn \ --deploy-mode client \ --class com.cbp.spark_hbase.SparkReadParquetToHbase \ --driver-memory 4G \ --executor-memory 20G \ --executor-cores 4 \ --num-executors 20 \ --conf spark.default.parallelism=240 \ --conf spark.speculation=true \ --conf spark.speculation.interval=100 \ --conf spark.speculation.quantile=0.75 \ --conf spark.speculation.multiplier=1.5 \ --conf spark.storage.memoryFraction=0.2 \ --conf spark.shuffle.memoryFraction=0.4 \ --conf spark.shuffle.service.enabled=true \ com.cbp.test-1.0-SNAPSHOT.jar \ "test" \ "./test1" \ "./temp" \ > ./log.file 2>&1 &
    2、使用hbase工具 ImportTsv 将 csv 文件导入hbase表。
    方式一、put直接写入
    hbase org.apache.hadoop.hbase.mapreduce.ImportTsv \ -Dimporttsv.columns=HBASE_ROW_KEY,fpmx:xf_nsrsbh,fpmx:gf_nsrsbh \ test \ ./test
    方式二、bulk load,先写临时文件,再用LoadIncrementalHFiles工具导入
    hbase org.apache.hadoop.hbase.mapreduce.ImportTsv \ -Dimporttsv.separator="," \ csv是逗号分隔的,默认是用 | 分隔 -Dimporttsv.bulk.output=./temp \ -Dimporttsv.columns=HBASE_ROW_KEY,fpmx:xf_nsrsbh,fpmx:gf_nsrsbh \ test \ ./test hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles \ -Dcreate.table=no \ 不创建表 ./temp \ test
    Processed: 0.009, SQL: 9