使用Scala语言通过Spark实现Join

    技术2022-07-12  69

    join

    join类似于SQL的inner join操作,返回结果是前面和后面集合中配对成功的,过滤掉关联不上的。

    leftOuterJoin

    leftOuterJoin类似于SQL中的左外关联left outer join,返回结果以前面的RDD为主,关联不上的记录为空。

    rightOuterJoin

    rightOuterJoin类似于SQL中的有外关联,返回结果以参数也就是右边的RDD为主,关联不上的记录为空

    接下来我们通过SQL与代码相结合的方式,了解一下,如何使用Spark实现连接查询

    学生信息表:stu_info(stu_id,stu_name,sex,province),数据如下内容:

    1,zhangsan,1,zj 2,lisi,0,gs 3,wangwu,1,bj 4,zhaoliu,0,sh

    成绩信息score_info(score_id,stu_id,course_name,score_num),数据如下内容:

    1,1,yuwen,56 2,1,shuxue,98 3,2,yuwen,76 4,2,shuxue,45 5,3,yuwen,89 6,3,shuxue,99 7,4,yuwen,34 8,4,shuxue,76

    需求:实现所有科目、成绩对应的学生姓名、性别、省份

    现在我们用Sql来描述一下我们想要的功能:

    select  stu.stu_id,stu.stu_name,stu.sex,stu.province,score.cource_name,score.score_num  from  stu_info stu  join  score_info score  on  stu.stu_id=score.stu_id

    如何使用Scala语言调用Spark函数来实现?

    val stuData = sc.textFile(args(0)) val scoreData = sc.textFile(args(1)) val stuRdd=stuData.map(line=>{   val cells=line.split(",")   (cells(0),(cells(1),cells(2),(3))) }) val scoreRdd=scoreData.map(line=>{   val cells=line.split(",")   (cells(1),(cells(2),cells(3))) }) val failedScoreRdd = scoreData.map(line => {   val cells = line.split(",")   (cells(0).toInt, cells(1).toInt, cells(2), cells(3).toInt) }).filter(line=>line._4<60) val joinRdd=stuRdd.join(scoreRdd) joinRdd.map(rdd=>{   val stu=rdd._2._1   val score=rdd._2._2   (rdd._1,stu._1,stu._2,stu._3,score._1,score._2) }) joinRdd.repartition(1).saveAsTextFile(args(2))

    查看输出文件:

    (4,zhaoliu,0,3,yuwen,34) (4,zhaoliu,0,3,shuxue,76) (2,lisi,0,3,yuwen,76) (2,lisi,0,3,shuxue,45) (3,wangwu,1,3,yuwen,89) (3,wangwu,1,3,shuxue,99) (1,zhangsan,1,3,yuwen,56) (1,zhangsan,1,3,shuxue,98)

    以上为标准非格式化输出,调整代码如下:

    val joinRdd=stuRdd.join(scoreRdd).map(rdd=>{   val stu=rdd._2._1   val score=rdd._2._2   s"${rdd._1},${stu._1},${stu._2},${stu._3},${score._1},${score._2}" })

    输出信息如下:

    4,zhaoliu,0,3,yuwen,34 4,zhaoliu,0,3,shuxue,76 2,lisi,0,3,yuwen,76 2,lisi,0,3,shuxue,45 3,wangwu,1,3,yuwen,89 3,wangwu,1,3,shuxue,99 1,zhangsan,1,3,yuwen,56 1,zhangsan,1,3,shuxue,98

    说明: 通过s""来进行字符串拼接,中间变量可用${变量名}来进行格式化输出 需求:查询未及格信息

    select stu.stu_id,stu.stu_name,stu.sex,stu.province,score.cource_name,score.score_num from stu_info stu join score_info score on stu.stu_id=score.stu_id where score.score_num <60

    Scala代码如下:

    val stuRdd=stuData.map(line=>{   val cells=line.split(",")   (cells(0),(cells(1),cells(2),(3))) }) val scoreRdd=scoreData.map(line=>{   val cells=line.split(",")   (cells(1),(cells(2),cells(3))) }).filter(rdd=>{rdd._2._2.toInt<60}) val joinRdd=stuRdd.join(scoreRdd).map(rdd=>{   val stu=rdd._2._1   val score=rdd._2._2   s"${rdd._1},${stu._1},${stu._2},${stu._3},${score._1},${score._2}" })

    输出信息如下:

    1,zhangsan,1,3,yuwen,56 4,zhaoliu,0,3,yuwen,34 2,lisi,0,3,shuxue,45

     

     

    本文版权归https://www.mulhyac.com所有,转载请注明出处.

    网址:https://www.mulhyac.com

    标签:SparkSQLSparkHadoop

    Processed: 0.017, SQL: 9