Spark Streaming用于流式数据的处理。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。
数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。
和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream。
DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而DStream是由这些RDD所组成的序列(因此得名“离散化”)。
文件数据流:能够读取所有HDFS API兼容的文件系统文件,通过fileStream方法进行读取,Spark Streaming 将会监控 dataDirectory 目录并不断处理移动进来的文件,记住目前不支持嵌套目录。
streamingContext.textFileStream(dataDirectory)
注意事项:
文件需要有相同的数据格式;
文件进入 dataDirectory的方式需要通过移动或者重命名来实现;
一旦文件移动进目录,则不能再修改,即便修改了也不会读取新数据;
/** * 从File中采集数据 */ object B_FileDataSource { def main(args: Array[String]): Unit = { //1.初始化SparkConf val conf: SparkConf = new SparkConf() .setMaster("local[3]") .setAppName("Spark Streaming Demo") //2.实时数据分析环境对象 //采集周期:以指定的时间为周期采集实时数据 val ssc = new StreamingContext(conf,Seconds(5)) //远程监控端口 val rid: DStream[String] = ssc.textFileStream("in/sparkstream/test") //解析 val dstream: DStream[String] = rid.flatMap(_.split(" ")) val dstream2: DStream[(String, Int)] = dstream.map((_,1)).reduceByKey(_+_) dstream2.print() //启动SparkStreaming最重要的设置 //1.启动采集器 ssc.start() //2.driver等待采集器执行 ssc.awaitTermination() } }使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理
/** * 循环创建几个RDD,将RDD放入队列。通过SparkStream创建Dstream,计算WordCount */ object D_Rdd { def main(args: Array[String]): Unit = { //1.初始化SparkConf val conf: SparkConf = new SparkConf() .setMaster("local[3]") .setAppName("Spark Streaming Demo") //2.实时数据分析环境对象 //采集周期:以指定的时间为周期采集实时数据 val ssc = new StreamingContext(conf, Seconds(5)) //3.创建RDD队列 val rddQueue = new mutable.Queue[RDD[Int]]() //4.创建QueueInputDStream val inputStream = ssc.queueStream(rddQueue, oneAtATime = false) //5.处理队列中的RDD数据 val mappedStream = inputStream.map((_, 1)) val reducedStream = mappedStream.reduceByKey(_ + _) //6.打印结果 reducedStream.print() //7.启动任务 ssc.start() //8.循环创建并向RDD队列中放入RDD for (i <- 1 to 5) { rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10) Thread.sleep(2000) } //启动SparkStreaming最重要的设置 //9.driver等待采集器执行 ssc.awaitTermination() } }需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集。
/** * 自定义采集器 */ object C_MyReceiver { def main(args: Array[String]): Unit = { //1.初始化SparkConf val conf: SparkConf = new SparkConf() .setMaster("local[3]") .setAppName("Spark Streaming Demo") //2.实时数据分析环境对象 //采集周期:以指定的时间为周期采集实时数据 val ssc = new StreamingContext(conf, Seconds(5)) //远程监控端口 val rid: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver("118.89.20.151", 9999)) //解析 val dstream: DStream[String] = rid.flatMap(_.split(" ")) val dstream2: DStream[(String, Int)] = dstream.map((_, 1)).reduceByKey(_ + _) dstream2.print() //启动SparkStreaming最重要的设置 //1.启动采集器 ssc.start() //2.driver等待采集器执行 ssc.awaitTermination() } } /** * 声明自定义采集器,继承Receiver类,实现抽象方法 */ class MyReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) { var socket: Socket = null def receive(): Unit = { socket = new Socket(host, port) val reader = new BufferedReader(new InputStreamReader(socket.getInputStream)) var line: String = null while ((line = reader.readLine()) != null) { //将采集的数据存储到采集器的内部进行转换 if ("END".equals(line)) { return } else { this.store(line) } } } override def onStart(): Unit = { new Thread(new Runnable { override def run(): Unit = { receive() } }) } override def onStop(): Unit = { socket.close() socket = null } }无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操作列在了下表中。
注意,针对键值对的DStream转化操作(比如reduceByKey())要添加import StreamingContext._才能在Scala中使用
直接看代码
/** * * @author Xiahu * @create 2020/6/29 * * 状态转换 */ object F_StateTransform { def main(args: Array[String]): Unit = { //1.初始化SparkConf val conf: SparkConf = new SparkConf() .setMaster("local[3]") .setAppName("Spark Streaming Demo") //2.实时数据分析环境对象 //采集周期:以指定的时间为周期采集实时数据 val ssc = new StreamingContext(conf, Seconds(5)) //远程监控端口 val rid: DStream[String] = ssc.textFileStream("in/sparkstream/test") //todo 保存数据的状态,需要设定检查点的路径 ssc.checkpoint("/tmp") //解析 val dstream: DStream[String] = rid.flatMap(_.split(" ")) val dstream2: DStream[(String, Int)] = dstream.map((_, 1)) //将转换结构后的数据做聚合操作 val value: DStream[(String, Int)] = dstream2.updateStateByKey { case (seq, buffer) => { val sum: Int = buffer.getOrElse(0) + seq.sum Option(sum) } } println(value) /** * 无状态数据与有状态数据: * 无状态: 有状态: * (A,1) (A,1) * (A,1) (A,1),(A,1) * (A,3) (A,3),(A,1),(A,1) * (A,1) (A,1),(A,3),(A,1),(A,1) * (A,2) (A,2),(A,2),(A,1),(A,3),(A,1),(A,1) * * 有状态的数据每次都会带着之前的数据一起返回 */ //启动SparkStreaming最重要的设置 //1.启动采集器 ssc.start() //2.driver等待采集器执行 ssc.awaitTermination() } }WindowOperations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态。
基于窗口的操作会在一个比StreamingContext 的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。
注意:所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长,两者都必须是 StreamContext 的批次间隔的整数倍。
scala 内的滑步操作 /** * * @author Xiahu * @create 2020/6/29 * scala窗口操作 */ object G_WindowDemo { def main(args: Array[String]): Unit = { val list = List(1, 2, 3, 4, 5, 6, 7, 8) //滑动窗口 val iterator: Iterator[List[Int]] = list.sliding(3) for (num <- iterator) { println(num) } } } spark 内的 窗口操作 /** * * @author Xiahu * @create 2020/6/29 * * spark 内使用窗口 */ object G_WindowDemo2 { def main(args: Array[String]): Unit = { //1.初始化SparkConf val conf: SparkConf = new SparkConf() .setMaster("local[3]") .setAppName("Spark Streaming Demo") //2.实时数据分析环境对象 //采集周期:以指定的时间为周期采集实时数据 val ssc = new StreamingContext(conf, Seconds(5)) //远程监控端口 val rid: DStream[String] = ssc.textFileStream("in/sparkstream/test") //窗口大小应该为采集周期的整数倍,窗口滑动的步长也是采集周期的整数倍 val windowDstream: DStream[String] = rid.window(Seconds(10), Seconds(10)) //解析 val dstream: DStream[String] = windowDstream.flatMap(_.split(" ")) val dstream2: DStream[(String, Int)] = dstream.map((_, 1)).reduceByKey((_ + _)) println(dstream2) //启动SparkStreaming最重要的设置 //1.启动采集器 ssc.start() //2.driver等待采集器执行 ssc.awaitTermination() } }Transform原语允许DStream上执行任意的RDD-to-RDD函数。
即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API。
该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换。
/** * * @author Xiahu * @create 2020/6/29 * * 转换 */ object H_Transform { def main(args: Array[String]): Unit = { //1.初始化SparkConf val conf: SparkConf = new SparkConf() .setMaster("local[3]") .setAppName("Spark Streaming Demo") //2.实时数据分析环境对象 //采集周期:以指定的时间为周期采集实时数据 val ssc = new StreamingContext(conf, Seconds(5)) //远程监控端口 val rid: DStream[String] = ssc.textFileStream("in/sparkstream/test") //todo Driver 端 执行1次 rid.map { case x => { //todo Executor 端 执行n次 (n表示Excutor的个数) x } } //转换 //todo Driver 端 执行1次 rid.transform { case rdd => { //todo Driver 端执行 执行m 次, m表示采集周期 rdd.map{ case x => { //TODO Executor 端 执行n次 (n表示Excutor的个数) x } } } } //解析 val dstream: DStream[String] = rid.flatMap(_.split(" ")) val dstream2: DStream[(String, Int)] = dstream.map((_, 1)) //启动SparkStreaming最重要的设置 //1.启动采集器 ssc.start() //2.driver等待采集器执行 ssc.awaitTermination() } }连接操作(leftOuterJoin, rightOuterJoin, fullOuterJoin也可以),可以连接
Stream-Streamwindows-stream to windows-streamstream-dataset val stream1: DStream[String, String] = ... val stream2: DStream[String, String] = ... val joinedStream = stream1.join(stream2) val windowedStream1 = stream1.window(Seconds(20)) val windowedStream2 = stream2.window(Minutes(1)) val joinedStream = windowedStream1.join(windowedStream2) //Stream-dataset joins val dataset: RDD[String, String] = ... val windowedStream = stream.window(Seconds(20))... val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。
与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。
如果StreamingContext中没有设定输出操作,整个context就都不会启动。
在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。在Python API中,同样的操作叫print()
以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。
”prefix-Time_IN_MS[.suffix]”.
以Java对象序列化的方式将Stream中的数据保存为 SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". Python中目前不可用。
将Stream中的数据保存为Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"
这是最通用的输出操作,即将函数 func 用于产生于 stream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库。
注意:函数func在运行流应用的驱动中被执行,同时其中一般函数RDD操作从而强制其对于流RDD的运算。
通用的输出操作foreachRDD(),它用来对DStream中的RDD运行任意计算。这和transform() 有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。
比如,常见的用例之一是把数据写到诸如MySQL的外部数据库中。 注意:
(1)连接不能写在driver层面;
(2)如果写在foreach则每个RDD都创建,得不偿失;
fix-TIME_IN_MS[.suffix]". Python中目前不可用。
将Stream中的数据保存为Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"
这是最通用的输出操作,即将函数 func 用于产生于 stream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库。
注意:函数func在运行流应用的驱动中被执行,同时其中一般函数RDD操作从而强制其对于流RDD的运算。
通用的输出操作foreachRDD(),它用来对DStream中的RDD运行任意计算。这和transform() 有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。
比如,常见的用例之一是把数据写到诸如MySQL的外部数据库中。 注意:
(1)连接不能写在driver层面;
(2)如果写在foreach则每个RDD都创建,得不偿失;
(3)增加foreachPartition,在分区创建。使用foreachPartition,在该算子下构造逻辑