Saprk读取hive表数据处理后写入Hbase表

    技术2022-08-01  68

    1、put方式scala编码实现:
    package com.nbdpt.work4_hive2hbase2019 import com.nbdpt.util.BaseUtil import org.apache.hadoop.hbase.client.{ConnectionFactory, Get, Put} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapreduce.Job import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.storage.StorageLevel object SparkReadHiveToHbasePut2019 { //设置日志显示级别 Logger.getLogger("org").setLevel(Level.INFO) def main(args: Array[String]): Unit = { //外部传参,hive表名、hbase表名 val hiveTable = args(0) val hbaseTable = args(1) val partitions = args(2) //创建SparkSession val ss = SparkSession.builder().appName("removeData") .enableHiveSupport() .getOrCreate() //读取hive数据 val hiveDF: DataFrame = ss.sql( s""" |select |xf_nsrsbh,gf_nsrsbh,fpdm,fphm,fpmxxh,flbm,kprq,kpyf,spmc,jldw,je,se,slv,zfbz,sjly,ext |from ${hiveTable} |""".stripMargin) //指定列族、列 val columnf = "fpmx" val columnNames = Array("xf_nsrsbh", "gf_nsrsbh", "fpdm", "fphm", "fpmxxh", "flbm", "kprq", "kpyf", "spmc", "jldw", "je", "se", "slv", "zfbz", "sjly", "ext") ss.sparkContext.broadcast(columnNames) //设置hbase配置信息 val hbaseConf = HBaseConfiguration.create() hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, hbaseTable) hbaseConf.set("hbase.zookeeper.quorum", "masterhost1, masterhost2, masterhost3") hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") //配置job信息 val job = Job.getInstance(hbaseConf) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) job.setOutputValueClass(classOf[Put]) //创建输出rdd,put方式输出到hbase mkOutPutRdd(hiveDF, columnNames, columnf, partitions) .saveAsNewAPIHadoopDataset(job.getConfiguration) ss.close() } //创建输出rdd def mkOutPutRdd(hiveDF: DataFrame, columnNames: Array[String], columnf: String, partitions: String) = { val rdd1 = hiveDF.rdd .filter(row => row.length > 0 && !nullDecide(row(0))) .filter(row => !nullDecide(row(2))) .filter(row => !nullDecide(row(3))) .filter(row => !nullDecide(row(4))) .filter(row => !nullDecide(row(7)) && row(7).toString.length > 4) .coalesce(Integer.valueOf(partitions), true) .persist(StorageLevel.MEMORY_AND_DISK_SER) val rdd2 = rdd1.map(row => { val buffer = Row.unapplySeq(row).get.map(_.asInstanceOf[String]).toBuffer buffer.update(14, "hive") val schema: StructType = row.schema val newRow: Row = new GenericRowWithSchema(buffer.toArray, schema) newRow }) .map(row => { val xfqybm = getQybm(String.valueOf(row(0)).trim) val fpdm = String.valueOf(row(2)).trim val fphm = String.valueOf(row(3)).trim val id = String.valueOf(row(4)).trim val yy = String.valueOf(row(7)).trim.substring(2, 4) val rowKey = mkRowKey(xfqybm, fpdm, fphm, yy, id) val put = new Put(Bytes.toBytes(rowKey)) columnNames.foreach(col => { put.addColumn(Bytes.toBytes(columnf), Bytes.toBytes(col), Bytes.toBytes(nullHandle(row.getAs(col)))) }) (new ImmutableBytesWritable, put) }) .filter(tuple => !Bytes.toString(tuple._2.getRow).contains("-")) .persist(StorageLevel.MEMORY_AND_DISK_SER) rdd1.unpersist() rdd2 } //获取企业编码 def getQybm(xf_nsrsbh: String) = { val hbaseConf = HBaseConfiguration.create() val conn = ConnectionFactory.createConnection(hbaseConf) val tableName = "nbdpt:qy_xydm_bm" val table = conn.getTable(TableName.valueOf(tableName)) val get = new Get(Bytes.toBytes(xf_nsrsbh)) val result = table.get(get) val xfqybm = Bytes.toString(result.getValue(Bytes.toBytes("qyxx"), Bytes.toBytes("qybm"))) conn.close() nullHandle(xfqybm) } //rowkey设计qybm+yy+fpbm def mkRowKey(xfqybm: String, fpdm: String, fphm: String, yy: String, id: String): String = { val fpbm = BaseUtil.getFpbm(fpdm, fphm) val rowKey = xfqybm + yy + fpbm + id rowKey } //处理空字段 def nullHandle(str: String): String = { if (str == null || str == "" || str == "null") { "-1" } else { str } } //判断空字段 def nullDecide(str: Any): Boolean = { if (str == null || str == "" || str == "null") { true } else { false } } }
    2、bulk load方式scala编码实现:
    package com.nbdpt.work4_hive2hbase2019 import com.nbdpt.util.BaseUtil import org.apache.hadoop.fs.Path import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName} import org.apache.hadoop.hbase.client.{ConnectionFactory, Get} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2 import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapreduce.Job import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer object SparkReadHiveToHbaseLoad2019 { Logger.getLogger("org").setLevel(Level.INFO) def main(args: Array[String]): Unit = { //外部传参,hive表名、hbase表名、hdfs路径、年份后两位 val hiveTable = args(0) val hbaseTable = args(1) val hfilePath = args(2) val partitions = args(3) //创建sparkConf val conf = new SparkConf() conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //创建SparkSession、开启hive支持 val ss = SparkSession.builder().appName("hiveTohbase") .enableHiveSupport() .getOrCreate() //读取hive数据 val hiveDF: DataFrame = ss.sql( s""" |select |xf_nsrsbh,gf_nsrsbh,fpdm,fphm,fpmxxh,flbm,kprq,kpyf,spmc,jldw,je,se,slv,zfbz,sjly,ext |from ${hiveTable} |""".stripMargin) //指定列族、列 val columnf = "fpmx" val columnNames = Array("xf_nsrsbh", "gf_nsrsbh", "fpdm", "fphm", "fpmxxh", "flbm", "kprq", "kpyf", "spmc", "jldw", "je", "se", "slv", "zfbz", "sjly", "ext") ss.sparkContext.broadcast(columnNames) //设置hbase配置信息 val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.mapreduce.hfileoutputformat.table.name", hbaseTable) hbaseConf.setInt("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", 5000) val conn = ConnectionFactory.createConnection(hbaseConf) val regionLocator = conn.getRegionLocator(TableName.valueOf(hbaseTable)) val table = conn.getTable(TableName.valueOf(hbaseTable)) val job = Job.getInstance(hbaseConf) job.setMapOutputKeyClass(classOf[ImmutableBytesWritable]) job.setMapOutputValueClass(classOf[KeyValue]) HFileOutputFormat2.configureIncrementalLoadMap(job, table.getDescriptor) //创建输出rdd mkOutputRdd(hiveDF, columnf, columnNames,partitions) .saveAsNewAPIHadoopFile(hfilePath, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], hbaseConf) val load = new LoadIncrementalHFiles(hbaseConf) load.doBulkLoad(new Path(hfilePath), conn.getAdmin, table, regionLocator) table.close() conn.close() ss.close() } //获取到hive数据df,转换为rdd def mkOutputRdd(hiveDF: DataFrame, columnf: String, columnNames: Array[String],partitions: String): RDD[(ImmutableBytesWritable, KeyValue)] = { val rdd1 = hiveDF.rdd .filter(row => row.length > 0 && !nullDecide(row(0))) .filter(row => !nullDecide(row(2))) .filter(row => !nullDecide(row(3))) .filter(row => !nullDecide(row(4))) .filter(row => !nullDecide(row(7)) && row(7).toString.length > 4) .coalesce(Integer.valueOf(partitions), true) .persist(StorageLevel.MEMORY_AND_DISK_SER) val rdd2 = rdd1.map(row => { val buffer = Row.unapplySeq(row).get.map(_.asInstanceOf[String]).toBuffer buffer.update(14, "hive") val schema: StructType = row.schema val newRow: Row = new GenericRowWithSchema(buffer.toArray, schema) newRow }).flatMap(row => { val xfqybm = getQybm(row(0).toString.trim) val fpdm = row(2).toString.trim val fphm = row(3).toString.trim val id = row(4).toString.trim val yy = row(7).toString.trim.substring(2, 4) val values = ArrayBuffer[(String, (String, String, String))]() columnNames.foreach(col => { values.append((mkRowKey(xfqybm, fpdm, fphm, yy, id), (columnf, col, nullHandle(row.getAs[String](col))))) }) values }).filter(x => !x._1.contains("-")) .persist(StorageLevel.MEMORY_AND_DISK_SER) rdd1.unpersist() val rdd3 = rdd2.sortBy(x => (x._1, x._2._1, x._2._2)) .map(rdd => { val rowKey = Bytes.toBytes(rdd._1) val family = Bytes.toBytes(rdd._2._1) val colum = Bytes.toBytes(rdd._2._2) val value = Bytes.toBytes(rdd._2._3) (new ImmutableBytesWritable(rowKey), new KeyValue(rowKey, family, colum, value)) }).persist(StorageLevel.MEMORY_AND_DISK_SER) rdd2.unpersist() rdd3 } def getQybm(xf_nsrsbh: String) = { val hbaseConf = HBaseConfiguration.create() val conn = ConnectionFactory.createConnection(hbaseConf) val tableName = "nbdpt:qy_xydm_bm" val table = conn.getTable(TableName.valueOf(tableName)) val get = new Get(Bytes.toBytes(xf_nsrsbh)) val result = table.get(get) val xfqybm = Bytes.toString(result.getValue(Bytes.toBytes("qyxx"), Bytes.toBytes("qybm"))) conn.close() nullHandle(xfqybm) } //rowkey设计qybm+yy+fpbm def mkRowKey(xfqybm: String, fpdm: String, fphm: String, yy: String, id: String): String = { val fpbm = BaseUtil.getFpbm(fpdm, fphm) val rowKey = xfqybm + yy + fpbm + id rowKey } //处理空字段 def nullHandle(str: String): String = { if (str == null || str == "" || str == "null") { "-1" } else { str } } //判断空字段 def nullDecide(str: Any): Boolean = { if (str == null || str == "" || str == "null") { true } else { false } } }
    submit提交脚本:
    nohup spark-submit \ --master yarn \ --deploy-mode cluster \ --class com.nbdpt.work4_hive2hbase2019.SparkReadHiveToHbaseLoad2019 \ --driver-memory 4G \ --executor-memory 14G \ --executor-cores 6 \ --num-executors 50 \ --conf spark.default.parallelism=900 \ --conf spark.speculation=true \ --conf spark.speculation.interval=100 \ --conf spark.speculation.quantile=0.75 \ --conf spark.speculation.multiplier=1.5 \ --conf spark.storage.memoryFraction=0.2 \ --conf spark.shuffle.memoryFraction=0.4 \ --conf spark.shuffle.service.enabled=true \ com.cbp.test-1.0-SNAPSHOT.jar \ "test_2016" \ "test_2016" \ "./test1" \ "500" \ > ./log.file 2>&1 &
    Processed: 0.011, SQL: 9