使用Scala编写sparkSQL

    技术2023-11-11  96

    1. 创建联接: spark1.x: val conf = new SparkConf().setAppName(“spark sql one”).setMaster(“local[*]”)

    val sc = new SparkContext(conf) //创建SQLContext对象 val sqlContext = new SQLContext(sc)

    spark2.x: val spark = SparkSession .builder() .appName(“Spark SQL basic example”)

    .master("local[*]")

    .getOrCreate()

    2. 如何来创建DataFrame: 1) DataFrame与Dataset的关系: dataFrame=Dataset[Row] 2) 创建的两种方式: 1) 结合 case class来创建DataFrame,利用反射机制来构建一个DataFrame( RDD的更高层次的封装,提供各种元数据。在RDD运行时,可以进行一些优化 ) 1)建立一个case class ( View视图 ) 2)读取数据,创建RDD( model模型 ), 返回 RDD[样例类对象] 3) 通过隐式类型转换将RDD转成 DataFrame import sqlContext.implicits._ val df = personRDD.toDF()

    以上也可以: 1)直接读取 val dataset=session.read.textFile("") 2) map( )分解成 Dataset[ 元组 ] 3) dataframe=dataset.toDF( 列名) 2) 结合structType( 结构类型对象来声明这些字段的元信息 ) 1) 读取数据时返回 RDD[Row] 2) 定义structType: val schema = StructType(List( StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("age", IntegerType, true), StructField("height", DoubleType, true) )) 3) 创建DataFrame,将RDD[Row]与structType绑定. session/sqlContext.createDataFrame(personRDD, schema)

    3. 如何操作DataFrame问题

    sql来操作 df.registerTempTable(“t_person”) //注册成临时表或视图 val resultDataFrame = sqlContext.sql(“select * from t_person order by age desc,height desc, name asc”)

    利用DataFrame的API来操作 ( DSL )

    分知识: DSL : 领域对象语言 正则表达式, xpath import sqlContext.implicits._ $“字段名” -> 将字符串转为Column对象DataFrame的常见API: select() orderBy() …

    4. spark SQL :自定义函数: 自定义函数类型: UDF: user defined function 1V1 , UDAF: user defined aggregation function nV1, UDTF: user defined table generation function 1vN

    udf的使用: spark.udf.register(“ip2province”, (ipNum: Long) => { //通过广播变量取出 ipRulesArray val ipRulesArray = broadcastRef.value var province = “unkown” var index = YcUtil.binarySearch(ipRulesArray, ipNum) if (index != -1) { province = ipRulesArray(index).getAs(“province”) } province }) 如何命名用这个注册的自定义函数: 1) SQL: spark.sql("select ip2province(ipNum) province, count(*) cn from v_accessIpLong group by province ") 2) DSl: accessIpLongDataFrame.selectExpr($“ip2province(ipNum)”.as(“province”).toString() ).groupBy( $“province”).count()

    Processed: 0.010, SQL: 9