Spark大数据分析入门笔记

    技术2022-07-10  136

    目录

    二、Spark特征

    三、Spark整体工作流程

    四、Spark运行方式

    五、Spark SQL

    1、Spark SQL兼容性

    2、Spark SQL编码方式tips

    3、Spark SQL数据格式

    4、Spark SQL性能优化

    1)参数优化

    (1)在内存中缓存数据,Spark缓存注册表的方法

    (2)性能优化相关参数

    (3)在进行表连接的时候,将小表广播可以提高性能,spark2.+中可以调整参数

    (4)分区数据的调控,spark任务并行度参数设置

    (5)文件与分区

    (6)钨丝计划

    2)代码优化

    (1)在数据统计的时候选择高性能算子

    (2)设置合理的数据类型

    (3)明确列名

    (4)并行处理查询结果

    (5)广播join表

    (6)写数据库的时候,关闭自动提交,不要每条提交一次,自己手动每个批次提交一次

    (7)复用已有的数据

    六、Spark MLlib

    1、MLlib是Spark的机器学习库

    2、Spark MLlib包

    3、目前MLlib支持的主要的机器学习算法

    4、MLlib基本数据类型

    (1)本地向量(Local Vector)

    (2)标注点(Labeled Point)

    (3)本地矩阵(Local Matrix)

    (4)分布式矩阵(Distributed Matrix)

    5、 机器学习工作流(ML Pipelines)—— spark.ml包

    七、PySpark

    八、Spark Streaming与Storm对比

    1、Storm适合场景

    2、Spark Streaming适合场景

    3、Spark Streaming与Storm的优劣分析

    九、Spark Streaming集成Kafka


    1. RDD(Resilient Distributed Dataset):弹性分布式数据集,是记录的只读分区集合,是Spark的基本数据结构。RDD代表一个不可变、可分区、里面的元素可并行计算的集合。RDD的依赖关系分为两种:窄依赖(Narrow Dependencies)、宽依赖(Wide Dependencies)。Spark会根据宽依赖窄依赖来划分具体的Stage,依赖可以高效地解决数据容错。

    窄依赖:每个父RDD的一个Partition最多被子RDD的一个Partition所使用(1:1 或 n:1)。例如map、filter、union等操作都会产生窄依赖。子RDD分区与数据规模无关;宽依赖:一个父RDD的Partition会被多个子RDD的Partition所使用(1:m 或 n:m),例如groupByKey、reduceByKey、sortByKey等操作都会产生宽依赖。子RDD分区与数据规模有关。

    2. DAG(Directed Acyclic Graph):有向无环图,在Spark里每一个操作生成一个RDD,RDD之间连成一条边,最后生成的RDD和他们之间的边组成一个有向无环图。有了计算的DAG图,Spark内核下一步的任务就是根据DAG图将计算划分成任务集,也就是Stage。

    3. RDD与DAG的关系:Spark计算的中间结果默认保存在内存中,Spark在划分Stage的时候会充分考虑在分布式计算中,可流水线计算(pipeline)的部分来提高计算效率,而在这个过程中Spark根据RDD之间依赖关系的不同,将DAG划分成不同的Stage(调度阶段)。对于窄依赖,partition的转换处理在一个Stage中完成计算;对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。

    4. Application:用户编写的Spark应用程序。

    5. Job:一个作业包含多个RDD及作用于相应RDD上的各种操作。

    6. Task:任务运行在Executor上的工作单元,是Executor中的一个线程。

    三者关系:Application由多个Job组成,Job由多个Stage组成,Stage由多个Task组成,Executor进程以多线程的方式运行Task。

    7. Action:该操作将触发基于RDD依赖关系的计算。

    8. Transformation:该转换操作具有懒惰执行的特性,它只指定新的RDD和其父RDD的依赖关系,只有当Action操作触发到该依赖的时候,它才被计算。

    9. PairRDD:指数据为Tuple2数据类型的RDD,其每个数据的第一个元素被当做key,第二个元素被当做value。

    10. 持久化操作:声明对一个RDD进行cache后,该RDD不会被立即缓存,而是等到它第一次因为某个Action操作触发后被计算出来时才进行缓存。可以使用persist明确指定存储级别,常用的存储级别是MEMORY_ONLY和MEMORY_AND_DISK。

    11. 共享变量:当Spark集群在许多节点上运行一个函数时,默认情况下会把这个函数涉及到的对象在每个节点生成一个副本。但是,有时候需要在不同节点或者节点和Driver之间共享变量。Spark提供两种类型的共享变量:广播变量、累加器。

    广播变量:不可变变量,实现在不同节点不同任务之间共享数据;广播变量在每个节点上缓存一个只读的变量,而不是为每个task生成一个副本,可以减少数据的传输。累加器:主要用于不同节点和Driver之间共享变量,只能实现计数或者累加功能;累加器的值只有在Driver上是可读的,在节点上只能执行add操作。

    12. 广播变量:

    广播变量是一个只读变量,通过它我们可以将一些共享数据集或者大变量缓存在Spark集群中的各个机器上而不用每个task都需要copy一个副本,后续计算可以重复使用,减少了数据传输时网络带宽的使用,提高效率。相比于Hadoop的分布式缓存,广播的内容可以跨作业共享。广播变量要求广播的数据不可变、不能太大但也不能太小(一般几十M以上)、可被序列化和反序列化、并且必须在driver端声明广播变量,适用于广播多个stage公用的数据,存储级别目前是MEMORY_AND_DISK。广播变量存储目前基于Spark实现的BlockManager分布式存储系统,Spark中的shuffle数据、加载HDFS数据时切分过来的block块都存储在BlockManager中。

    二、Spark特征

    高效性:不同于MapReduce将中间计算结果放入磁盘中,Spark采用内存存储中间计算结果,减少了迭代运算的磁盘IO,并通过并行计算DAG图的优化,减少了不同任务之间的依赖,降低了延迟等待时间。

    易用性:Spark提供了超过80种不同的算子,如map,reduce,filter,groupByKey,sortByKey,foreach等;Spark task以线程的方式维护,对于小数据集读取能够达到亚秒级的延迟。

    通用性:Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。

    兼容性:Spark能够跟很多开源工程兼容使用。如Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且Spark可以读取多种数据源,如HDFS、HBase、MySQL等。

    Spark运行架构特点:

    每个Application均有专属的Executor进程,并且该进程在Application运行期间一直驻留;Spark运行过程与资源管理器无关,只要能够获取Executor进程并保持通信即可;Task采用了数据本地性和推测执行等优化机制。

    三、Spark整体工作流程

    构建Spark Application运行环境;SparkContext向资源管理器注册;SparkContext向资源管理器申请运行Executor;资源管理器分配Executor;资源管理器启动Executor;Executor发送心跳至资源管理器;SparkContext构建成DAG图;将DAG图分解成Stage(TaskSet);把Stage(TaskSet)发送给TaskScheduler;Executor向SparkContext申请Task;TaskScheduler将Task发送给Executor运行;同时SparkContext将应用程序代码发放给Executor;Task在Executor上运行,运行完毕释放所有资源。

     


    四、Spark运行方式

    spark本身是用Scala编写的,spark1.4.0 起支持R语言和Python3编程。

    通过spark-shell进入Spark交互式环境,使用Scala语言;通过spark-submit提交Spark应用程序进行批处理;该方法可以提交Scala或Java语言编写的代码编译后生成的jar包,也可以直接提交Python脚本。通过pyspark进入pyspark交互式环境,使用Python语言;该方式可以指定jupyter或者ipython为交互环境。通过zepplin notebook交互式执行;zepplin在jupyter notebook里。安装Apache Toree-Scala内核,可以在jupyter 中运行spark-shell。使用spark-shell运行时,可以添加两个常用的两个参数:master指定使用何种分布类型;jars指定依赖的jar包。

    五、Spark SQL

    Spark SQL 是从shark发展而来。

    1、Spark SQL兼容性

    Spark SQL在Hive兼容层面仅依赖HQL parser、Hive Metastore和Hive SerDe(后两者用于兼容Hive存储格式)。

    从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管。执行计划生成和优化都由Catalyst负责,借助Scala的模式匹配等函数式语言特性,利用Catalyst开发执行计划优化策略比Hive要简洁得多。

    2、Spark SQL编码方式tips

    直接通过Spark编码:需提前声明构建SQLContext或者是SparkSession(Spark2之后建议使用);spark-sql(shell)脚本编码:启动前可以通过bin/spark-sql –help 查看配置参数,调整部署模式资源等;Thriftserver编码:基于HiveServer2实现的一个Thrift服务,旨在无缝兼容HiveServer2。部署好Spark Thrift Server后,可以直接使用hive的beeline访问Spark Thrift Server,执行相关语句。

    3、Spark SQL数据格式

    默认的是parquet,可以通过spark.sql.sources.default,修改默认配置。

    Spark SQL可支持多种数据格式:json, parquet, jdbc, orc, libsvm, csv, text;

    其中hive表使用:

    spark 1.6及以前的版本需要hivecontext;Spark2开始只需要创建sparksession增加enableHiveSupport()即可。

    4、Spark SQL性能优化

    1)参数优化

    (1)在内存中缓存数据,Spark缓存注册表的方法

    版本缓存释放缓存spark2.+spark.catalog.cacheTable("tableName")缓存表spark.catalog.uncacheTable("tableName")清空缓存spark1.+sqlContext.cacheTable("tableName")缓存sqlContext.uncacheTable("tableName") 清空缓存

    Spark SQL仅仅会缓存必要的列,并且自动调整压缩算法来减少内存和GC压力。

    对于一条SQL语句中可能多次使用到的表,可以对其进行缓存,使用SQLContext.cacheTable(tableName),或者DataFrame.cache()即可。SparkSQL会用内存列存储的格式进行表的缓存。然后SparkSQL就可以仅仅扫描需要使用的列,并且自动优化压缩,来最小化内存使用和GC开销。SQLContext.uncacheTable(tableName)可以将表从缓存中移除。用SQLContext.setConf(),设置spark.sql.inMemoryColumnarStorage.batchSize参数(默认10000),可以配置列存储的单位。

    (2)性能优化相关参数

    属性默认值描述spark.sql.inMemoryColumnarStorage.compressedtrueSpark SQL 将会基于统计信息自动地为每一列选择一种压缩编码方式。spark.sql.inMemoryColumnarStorage.batchSize10000缓存批处理大小。缓存数据时, 较大的批处理大小可以提高内存利用率和压缩率,但同时也会带来 OOM(Out Of Memory)的风险。spark.sql.files.maxPartitionBytes128 MB读取文件时单个分区可容纳的最大字节数(不过不推荐手动修改,可能在后续版本自动的自适应修改)spark.sql.files.openCostInBytes4M打开文件的估算成本, 按照同一时间能够扫描的字节数来测量。当往一个分区写入多个文件的时候会使用。高估更好, 这样的话小文件分区将比大文件分区更快 (先被调度)。

    (3)在进行表连接的时候,将小表广播可以提高性能,spark2.+中可以调整参数

    属性默认值描述spark.sql.broadcastTimeout300广播等待超时时间,单位秒spark.sql.autoBroadcastJoinThreshold10M用于配置一个表在执行 join 操作时能够广播给所有 worker 节点的最大字节大小。通过将这个值设置为 -1 可以禁用广播。注意,当前数据统计仅支持已经运行了 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 命令的 Hive Metastore 表。

    在任务超多,广播变量在跨stage使用数据的时候才能凸显其真正作用。

    (4)分区数据的调控,spark任务并行度参数设置

    属性默认值描述spark.sql.shuffle.partitions200用于配置 join 或aggregate混洗(shuffle)数据时使用的分区数。spark.default.parallelism

    对于分布式shuffle操作像reduceByKey和join,父RDD中分区的最大数目。

    对于无父RDD的并行化等操作,它取决于群集管理器:

    -本地模式:本地计算机上的核心数

    -Mesos fine grained mode:8

    -其他:所有执行节点上的核心总数或2,以较大者为准

    分布式shuffle操作的分区数

    在实际测试中,设置Shuffle过程中的并行度spark.sql.shuffle.partitions是对sparks SQL的专用设置;spark.default.parallelism只有在处理RDD时才会起作用,对Spark SQL的无效。

    (5)文件与分区

    属性默认值描述spark.sql.files.maxPartitionBytes134217728 (128 MB)打包传入一个分区的最大字节,读取文件的时候一个分区接受多少数据;spark.sql.files.openCostInBytes4194304 (4 MB)文件打开是有开销的,Spark 用相同时间能扫描的数据的字节数来衡量打开文件的开销。当将多个文件写入同一个分区的时候该参数有用。该参数设置较大,有小文件的分区会比大文件分区处理速度更快(优先调度)。

    spark.sql.files.maxPartitionBytes该值的调整要结合你想要的并发度及内存的大小来进行;spark.sql.files.openCostInBytes说直白一些这个参数就是合并小文件的阈值,小于这个阈值的文件将会合并。

    文件格式建议使用parquet或者orc;parquet已经可以达到较大性能。

    不必要的情况下,关闭分区字段类型自动推导:

    (6)钨丝计划

    spark.sql.tungsten.enabled,默认是true,自动管理内存。

    2)代码优化

    (1)在数据统计的时候选择高性能算子

    例如Dataframe使用foreachPartitions将数据写入数据库,不要每个record都去拿一次数据库连接。通常写法是每个partition拿一次数据库连接。

    /** * 将统计结果写入MySQL中 * 代码优化: * 在进行数据库操作的时候,不要每个record都去操作一次数据库 * 通常写法是每个partition操作一次数据库 **/ try { videoLogTopNDF.foreachPartition(partitionOfRecords => { val list = new ListBuffer[DayVideoAccessStat] partitionOfRecords.foreach(info => { val day = info.getAs[String]("day") val cmsId = info.getAs[Long]("cmsId") val times = info.getAs[Long]("times") list.append(DayVideoAccessStat(day, cmsId, times)) }) StatDao.insertDayVideoTopN(list) }) }catch{ case e:Exception =>e.printStackTrace() }

    (2)设置合理的数据类型

    在Hive数据仓库建设过程中,合理设置数据类型,比如能设置为IINT的,就不要设置成BIGINT,减少数据类型导致的不必要的内存开销。

    (3)明确列名

    编写SQL是,尽量给出明确的列明,比如select name from students。不要写成select * from students。

    (4)并行处理查询结果

    对SparkSQL查询的结果,如果数据量比较大,比如超过了1000条,不要一次性collect()到Driver再处理。使用foreach()算子,并行处理查询结果。

    (5)广播join表

    spark.sql.autoBroadcastJoinTreshold,默认10485760(10MB)。再内存够用的情况下,减小其大小,可以将join中的较小的表广播出去,而不用进行网络数据输出。注意:大小不超过设置值的表,都会被广播出去。最有效的,其实就是并行处理查询结果、缓存表和广播join表。

    (6)写数据库的时候,关闭自动提交,不要每条提交一次,自己手动每个批次提交一次

    var connection:Connection = null var pstmt : PreparedStatement = null try{ connection = MySQLUtils.getConnection() connection.setAutoCommit(false)//关闭自动提交 val sql = "insert into day_video_access_topn_stat(day,cms_id,times) values(?,?,?)" pstmt = connection.prepareStatement(sql) for(ele <- list){ pstmt.setString(1,ele.day) pstmt.setLong(2,ele.cmsId) pstmt.setLong(3,ele.times) //加入到批次中,后续再执行批量处理 这样性能会好很多 pstmt.addBatch() } //执行批量处理 pstmt.executeBatch() connection.commit() //手工提交 }catch { case e :Exception =>e.printStackTrace() }finally { MySQLUtils.release(connection,pstmt) }

    (7)复用已有的数据

    三个统计方法都是只要当天的视频数据,所以在调用方法前过滤出当天视频数据,缓存到内存中。

    然后传到三个统计方法中使用。

    不要在每个统计方法都去做一次相同的过滤。

    val logDF = spark.read.format("parquet") .load("file:///F:\\mc\\SparkSQL\\data\\afterclean") val day = "20170511" /** * 代码优化:复用已有数据 * 既然每次统计都是统计的当天的视频, * 先把该数据拿出来,然后直接传到每个具体的统计方法中 * 不要在每个具体的统计方法中都执行一次同样的过滤 * * 用$列名得到列值,需要隐式转换 import spark.implicits._ * */ import spark.implicits._ val dayVideoDF = logDF.filter($"day" ===day&&$"cmsType"==="video") /** * 将这个在后文中会复用多次的dataframe缓存到内存中 * 这样后文在复用的时候会快很多 * * default storage level (`MEMORY_AND_DISK`). * */ dayVideoDF.cache() //logDF.printSchema() //logDF.show() StatDao.deletaDataByDay(day) //统计每天最受欢迎(访问次数)的TopN视频产品 videoAccessTopNStatDFAPI(spark,dayVideoDF) //按照地势统计每天最受欢迎(访问次数)TopN视频产品 每个地市只要最后欢迎的前三个 cityAccessTopNStat(spark,dayVideoDF) //统计每天最受欢迎(流量)TopN视频产品 videoTrafficsTopNStat(spark,dayVideoDF) //清除缓存 dayVideoDF.unpersist(true)

    六、Spark MLlib

    1、MLlib是Spark的机器学习库

    算法工具:常用的学习算法,如分类、回归、聚类和协同过滤;特征化公交:特征提取、转化、降维和选择公交;管道(Pipeline):用于构建、评估和调整机器学习管道的工具;持久性:保存和加载算法,模型和管道;实用工具:线性代数,统计,数据处理等工具。

    2、Spark MLlib包

    spark.mllib包含基于RDD的原始算法API。spark.ml则提供了基于DataFrames 高层次的API,可以用来构建机器学习工作流(PipeLine)。ML Pipeline 弥补了原始 MLlib 库的不足,向用户提供了一个基于 DataFrame 的机器学习工作流式 API 套件。二者区别:技术角度上,面向的数据集类型不一样:ML的API是面向Dataset的(Dataframe是Dataset的子集,也就是Dataset[Row]), mllib是面对RDD的。Dataset和RDD有啥不一样呢?Dataset的底端是RDD。Dataset对RDD进行了更深一层的优化,比如说有sql语言类似的黑魔法,Dataset支持静态类型分析所以在compile time就能报错,各种combinators(map,foreach等)性能会更好,等等。编程过程上,构建机器学习算法的过程不一样:ML提倡使用pipelines,把数据想成水,水从管道的一段流入,从另一端流出: 大体概念:DataFrame => Pipeline => A new DataFrame Pipeline: 是由若干个Transformers和Estimators连起来的数据处理过程 Transformer:入:DataFrame => 出: Data Frame Estimator:入:DataFrame => 出:Transformer

    注:Spark官方推荐使用spark.ml,并预期于3.0版本移除spark.mllib。使用 ML Pipeline API可以很方便的把数据处理,特征转换,正则化,以及多个机器学习算法联合起来,构建一个单一完整的机器学习流水线。

    3、目前MLlib支持的主要的机器学习算法

     离散型数据连续型数据监督学习分类、逻辑回归、SVM、决策树、随机森林、GBT、朴素贝叶斯、多层感知机(Multi-Layer Perceptron)、One-Vs-Rest回归、线性回归、决策树、随机森林、GBT、AFT生存回归、保序回归(Isotonic Regression)无监督学校聚类、K-Means、高斯混合(Gaussian Mixture)、LDA、幂迭代聚类(Power iteration)、二分K均值(Bisecting K-Means)降维、矩阵分解(Matrix Factorization)、主成分分析(PCA)、基于奇异值分解(SVD)、最小二乘法(ALS)、加权最小二乘法(WLS)

    推荐阅读:https://zhuanlan.zhihu.com/p/85612466

    4、MLlib基本数据类型

    (1)本地向量(Local Vector)

    存储在单机上,拥有整型、从0开始的索引值以及浮点型的元素值。

    其中稠密向量(DenseVector)使用一个双精度浮点型数组来表示其中每一维元素,而稀疏向量(SparseVector)则是基于一个整型索引数组和一个双精度浮点型的值数组。以向量(1.0, 0.0, 3.0)为例:稠密向量表达形式为[1.0, 0.0, 3.0];稀疏向量形式为(3, [0, 2], [1.0, 3.0]),其中3是向量长度,[0, 2]是向量中非0维度的索引值,[1.0, 3.0]是按索引排列的数组元素值。

    (2)标注点(Labeled Point)

    一种带有标签(Label/Response)的本地向量,它可以是稠密或者是稀疏的。

    标注点仅在监督学习算法中使用,由于标签是用双精度浮点型来存储的,故标注点类型在回归(Regression)和分类(Classification)问题上均可使用。以二分类为例,正样本的标签为1,负样本的标签为0;而多分类标签则是一个以0开始的索引序列,如:0, 1, 2 ...

    (3)本地矩阵(Local Matrix)

    存储在单机上,具有整型的行、列索引值和双精度浮点型的元素值。其中稠密矩阵将所有元素的值存储在一个列优先(Column-major)的双精度型数组中,而稀疏矩阵则将非零元素以列优先的CSC(Compressed Sparse Column)模式进行存储。

    (4)分布式矩阵(Distributed Matrix)

    由长整型的行列索引值和双精度浮点型的元素值组成,可以分布式地存储在一个或多个RDD上。

    1)行矩阵(Row Matrix)是最基础的分布式矩阵类型。每行是一个本地向量,行索引无实际意义(即无法直接使用)。数据存储在一个由行组成的RDD中,其中每一行都使用一个本地向量来进行存储。由于行是通过本地向量来实现的,故列数(即行的维度)被限制在普通整型。在实际应用中,由于单机处理本地向量的存储和通信代价,行维度更是需要被控制在一个更小的范围之内。

    2)索引行矩阵(Index Row Matrix):与行矩阵相似,但它的每一行都带有一个有意义的行索引值,这个索引值可以被用来识别不同行,其数据存储在一个由IndexRow组成的RDD里,即每一行都是一个带长整型索引的本地向量。

    3)坐标矩阵(Coordinate Matrix):一个基于矩阵项构成的RDD的分布式矩阵每一个矩阵项(MatrixEntry)都是一个三元组(i: Long, j: Long, value: Double),其中i是行索引,j是列索引,value是该位置的值。坐标矩阵一般在矩阵的两个维度都很大,且矩阵非常稀疏的时候使用。

    4)分块矩阵(Block Matrix):分块矩阵将矩阵分成一系列矩阵块,底层由矩阵块构成的RDD来进行数据存储,每一个矩阵块都是一个元组((Int, Int), Matrix),其中(Int, Int)是块的索引,Matrix是在对应位置的子矩阵(sub-matrix),其尺寸由rowsPerBlock和colsPerBlock决定,默认值均为1024。

    分块矩阵之间可以进行加法操作和乘法操作,并使用方法validate()来确认分块矩阵是否创建成功。分块矩阵可由索引行矩阵(IndexedRowMatrix)或坐标矩阵(CoordinateMatrix)调用toBlockMatrix(rowsPerBlock, colsPerBlock)方法来进行转换,该方法将矩阵划分成尺寸默认为1024x1024的分块,可以在调用该方法时传入参数来调整分块的尺寸。分块矩阵用于生成分布式矩阵的底层RDD必须是已经确定(Deterministic)的,因为矩阵的尺寸将被存储下来,所以使用未确定的RDD将会导致错误。而且,不同类型的分布式矩阵之间的转换需要进行一个全局的shuffle操作,非常耗费资源。所以,根据数据本身的性质和应用需求来选取恰当的分布式矩阵存储类型是非常重要的。

    5、 机器学习工作流(ML Pipelines)—— spark.ml包

    工作原理介绍:https://zhuanlan.zhihu.com/p/33619687

    七、PySpark

    离线安装包下载地址:https://pypi.org/project/pyspark/#history

    PySpark安装:https://www.cnblogs.com/liuyutan/p/13289880.html

    PySpark入门:https://zhuanlan.zhihu.com/p/85612466

    随机森林实战:http://www.doc88.com/p-8436427224488.html

    逻辑回归实战:https://blog.csdn.net/baymax_007/article/details/82428984?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromBaidu-2.add_param_isCf&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromBaidu-2.add_param_isCf

    八、Spark Streaming与Storm对比

    1、Storm适合场景

    建议在纯实时,要求1秒以内延迟的场景下使用,比如实时金融系统,要求纯实时进行金融交易和分析;对于实时计算功能的事务机制可靠性要求很高,即数据的处理完全精准;针对高峰低峰时间段,动态调整实时计算程序的并行度,以最大限度利用集群资源(通常是在小型公司,集群资源紧张的情况);在一个大数据应用系统中,它纯粹被用作实时计算,不需要在中间执行SQL交互式查询、复杂的transformation算子等。

    2、Spark Streaming适合场景

    不要求纯实时,不要求强大可靠的事务机制,不要求动态调整并行度;针对整个项目进行宏观的考虑,即,如果一个项目除了实时计算之外,还包括了离线批处理、交互式查询等业务功能,而且实时计算中,可能还会牵扯到高延迟批处理、交互式查询等功能,那么应该首选Spark生态,用Spark Core开发离线批处理,用Spark SQL开发交互式查询,用Spark Streaming开发实时计算,三者可以无缝整合,给系统提供非常高的可扩展性。

    3、Spark Streaming与Storm的优劣分析

    吞吐量:Spark Streaming优于Storm;实时延迟:Storm优于Spark Streaming;事务机制、健壮性 / 容错性、动态调整并行度等特性:Storm优于Spark Streaming;对实时处理出来的中间数据,立即在程序中无缝进行延迟批处理、交互式查询等操作:Spark Streaming优于Storm。

    九、Spark Streaming集成Kafka

     

     

     

    Processed: 0.013, SQL: 9