SparkStreaming DStream关联操作【流流关联、流数据集关联】

    技术2025-10-29  6

    一.流流关联

    1.普通流流连接

    val lines = ssc.socketTextStream("master",9999) val words = lines.flatMap(_.split(" ")) .map(word => (word,1)) .reduceByKey(_ + _) val result = words.join(words)

    在此,在每个批处理间隔中,stream1的RDD将与stream2的RDD关联在一起。也可以做leftOuterJoin,rightOuterJoin,fullOuterJoin。此外,在流的窗口上进行联接通常非常有用。 2.窗口流流连接

    val lines = ssc.socketTextStream("master",9999) val words = lines.flatMap(_.split(" ")) .map(word => (word,1)) .reduceByKey(_ + _) val windowDStream1 = words.window(Seconds(20)) val windowDStream2 = words.window(Minutes(1)) val result = windowDStream1.join(windowDStream2)

    二.流与数据集关联

    创建rdd数据集:

    val array = Array("spark,scala", "hadoop,java", "tensorflow,python", "solr,java", "hbase,java") val masterRDD = sc.parallelize(array).map(row => { val Array(master,follow) = row.split(",") (master, follow) })

    流与数据集关联:

    val lines = ssc.socketTextStream("master",9999) val words = lines.flatMap(_.split(" ")) .map(word => (word,1)) .reduceByKey(_ + _) val windowDStream = words.window(Seconds(20)) windowDStream.foreachRDD(rdd => { rdd.join(masterRDD) })

    或者:

    val lines = ssc.socketTextStream("master",9999) val words = lines.flatMap(_.split(" ")) .map(word => (word,1)) .reduceByKey(_ + _) words.transform{rdd => { rdd.join(masterRDD) }}

    实际上,还可以动态更改要加入的数据集。transform每个批次间隔都会评估提供给该函数的功能,因此将使用dataset参考所指向的当前数据集。

    Processed: 0.009, SQL: 9