Spark读取Hbase数据保存为csv和parquet格式

    技术2022-07-17  65

    利用Spark SQL的DataFream 将hbase表数据保存为csv或者parquet格式文件。
    代码:
    package com.cbp.spark_hbase import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.types.{StringType, StructField, StructType} import scala.collection.mutable.ArrayBuffer object SparkReadHbaseSaveCsvOrParquet { Logger.getLogger("org").setLevel(Level.INFO) def main(args: Array[String]): Unit = { //外部传参,hbase表名、文件保存路径 val readName = args(0) val outPath = args(1) val ss = SparkSession.builder().getOrCreate() val hconf = HBaseConfiguration.create() hconf.set(TableInputFormat.INPUT_TABLE, readName) val hRdd = ss.sparkContext.newAPIHadoopRDD(hconf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) val columnf = "fpmx" val columnNames = Array("xf_nsrsbh", "gf_nsrsbh", "fpdm", "fphm", "fpmxxh", "flbm", "kprq", "kpyf", "spmc", "jldw", "je", "se", "slv", "zfbz", "sjly", "ext") val df = hbaseRddToDF(ss, hRdd, columnf, columnNames) df.createOrReplaceTempView("fp_mx") df.sqlContext.sql( s""" |select |xf_nsrsbh,gf_nsrsbh,fpdm,fphm,fpmxxh,flbm,kprq,kpyf,spmc,jldw,je,se,slv,zfbz,sjly,ext |from fp_mx |""".stripMargin) df.write.mode("append").csv(outPath) // .parquet(outPath) ss.close() } //RDD转dataDrame方法,三个参数(SparkSession,columnFamily列族,columnNames;列:一个集合) def hbaseRddToDF(ss: SparkSession, hbaseRdd: RDD[(ImmutableBytesWritable, Result)], columnFamily: String, columnNames: Array[String]): DataFrame = { //通过可变array来封装Array[StructField]属性数组 val structFields = ArrayBuffer(StructField("row_key", StringType)) columnNames.foreach(y => { structFields.append(StructField(y, StringType)) }) //定义schema,StructType是一个case class,可以有多个StructField,源码case class StructType(fields: Array[StructField]) val dfschema = StructType(structFields.toArray) //封装rowRDD val rowRdd = hbaseRdd.map(rdd => { val values = ArrayBuffer[String](Bytes.toString(rdd._2.getRow)) columnNames.foreach(columns => { values.append(Bytes.toString(rdd._2.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(columns)))) }) Row.fromSeq(values.toSeq) }) //通过createDataFrame方法将rdd转换为dataFrame,两个参数(rowRDD,StructType) val rddToDF = ss.createDataFrame(rowRdd, dfschema) rddToDF } }
    submit提交脚本:
    nohup spark-submit \ --master yarn \ --deploy-mode client \ --class com.cbp.spark_hbase.SparkReadHbaseSaveCsvOrParquet \ --driver-memory 4G \ --executor-memory 20G \ --executor-cores 4 \ --num-executors 20 \ --conf spark.default.parallelism=240 \ --conf spark.speculation=true \ --conf spark.speculation.interval=100 \ --conf spark.speculation.quantile=0.75 \ --conf spark.speculation.multiplier=1.5 \ --conf spark.storage.memoryFraction=0.2 \ --conf spark.shuffle.memoryFraction=0.4 \ --conf spark.shuffle.service.enabled=true \ com.cbp.test-1.0-SNAPSHOT.jar \ "test" \ "./test1" \ > ./log.file 2>&1 &
    Processed: 0.011, SQL: 9