目录
二、Spark特征
三、Spark整体工作流程
四、Spark运行方式
五、Spark SQL
1、Spark SQL兼容性
2、Spark SQL编码方式tips
3、Spark SQL数据格式
4、Spark SQL性能优化
1)参数优化
(1)在内存中缓存数据,Spark缓存注册表的方法
(2)性能优化相关参数
(3)在进行表连接的时候,将小表广播可以提高性能,spark2.+中可以调整参数
(4)分区数据的调控,spark任务并行度参数设置
(5)文件与分区
(6)钨丝计划
2)代码优化
(1)在数据统计的时候选择高性能算子
(2)设置合理的数据类型
(3)明确列名
(4)并行处理查询结果
(5)广播join表
(6)写数据库的时候,关闭自动提交,不要每条提交一次,自己手动每个批次提交一次
(7)复用已有的数据
六、Spark MLlib
1、MLlib是Spark的机器学习库
2、Spark MLlib包
3、目前MLlib支持的主要的机器学习算法
4、MLlib基本数据类型
(1)本地向量(Local Vector)
(2)标注点(Labeled Point)
(3)本地矩阵(Local Matrix)
(4)分布式矩阵(Distributed Matrix)
5、 机器学习工作流(ML Pipelines)—— spark.ml包
七、PySpark
八、Spark Streaming与Storm对比
1、Storm适合场景
2、Spark Streaming适合场景
3、Spark Streaming与Storm的优劣分析
九、Spark Streaming集成Kafka
1. RDD(Resilient Distributed Dataset):弹性分布式数据集,是记录的只读分区集合,是Spark的基本数据结构。RDD代表一个不可变、可分区、里面的元素可并行计算的集合。RDD的依赖关系分为两种:窄依赖(Narrow Dependencies)、宽依赖(Wide Dependencies)。Spark会根据宽依赖窄依赖来划分具体的Stage,依赖可以高效地解决数据容错。
窄依赖:每个父RDD的一个Partition最多被子RDD的一个Partition所使用(1:1 或 n:1)。例如map、filter、union等操作都会产生窄依赖。子RDD分区与数据规模无关;宽依赖:一个父RDD的Partition会被多个子RDD的Partition所使用(1:m 或 n:m),例如groupByKey、reduceByKey、sortByKey等操作都会产生宽依赖。子RDD分区与数据规模有关。2. DAG(Directed Acyclic Graph):有向无环图,在Spark里每一个操作生成一个RDD,RDD之间连成一条边,最后生成的RDD和他们之间的边组成一个有向无环图。有了计算的DAG图,Spark内核下一步的任务就是根据DAG图将计算划分成任务集,也就是Stage。
3. RDD与DAG的关系:Spark计算的中间结果默认保存在内存中,Spark在划分Stage的时候会充分考虑在分布式计算中,可流水线计算(pipeline)的部分来提高计算效率,而在这个过程中Spark根据RDD之间依赖关系的不同,将DAG划分成不同的Stage(调度阶段)。对于窄依赖,partition的转换处理在一个Stage中完成计算;对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。
4. Application:用户编写的Spark应用程序。
5. Job:一个作业包含多个RDD及作用于相应RDD上的各种操作。
6. Task:任务运行在Executor上的工作单元,是Executor中的一个线程。
三者关系:Application由多个Job组成,Job由多个Stage组成,Stage由多个Task组成,Executor进程以多线程的方式运行Task。
7. Action:该操作将触发基于RDD依赖关系的计算。
8. Transformation:该转换操作具有懒惰执行的特性,它只指定新的RDD和其父RDD的依赖关系,只有当Action操作触发到该依赖的时候,它才被计算。
9. PairRDD:指数据为Tuple2数据类型的RDD,其每个数据的第一个元素被当做key,第二个元素被当做value。
10. 持久化操作:声明对一个RDD进行cache后,该RDD不会被立即缓存,而是等到它第一次因为某个Action操作触发后被计算出来时才进行缓存。可以使用persist明确指定存储级别,常用的存储级别是MEMORY_ONLY和MEMORY_AND_DISK。
11. 共享变量:当Spark集群在许多节点上运行一个函数时,默认情况下会把这个函数涉及到的对象在每个节点生成一个副本。但是,有时候需要在不同节点或者节点和Driver之间共享变量。Spark提供两种类型的共享变量:广播变量、累加器。
广播变量:不可变变量,实现在不同节点不同任务之间共享数据;广播变量在每个节点上缓存一个只读的变量,而不是为每个task生成一个副本,可以减少数据的传输。累加器:主要用于不同节点和Driver之间共享变量,只能实现计数或者累加功能;累加器的值只有在Driver上是可读的,在节点上只能执行add操作。12. 广播变量:
广播变量是一个只读变量,通过它我们可以将一些共享数据集或者大变量缓存在Spark集群中的各个机器上而不用每个task都需要copy一个副本,后续计算可以重复使用,减少了数据传输时网络带宽的使用,提高效率。相比于Hadoop的分布式缓存,广播的内容可以跨作业共享。广播变量要求广播的数据不可变、不能太大但也不能太小(一般几十M以上)、可被序列化和反序列化、并且必须在driver端声明广播变量,适用于广播多个stage公用的数据,存储级别目前是MEMORY_AND_DISK。广播变量存储目前基于Spark实现的BlockManager分布式存储系统,Spark中的shuffle数据、加载HDFS数据时切分过来的block块都存储在BlockManager中。高效性:不同于MapReduce将中间计算结果放入磁盘中,Spark采用内存存储中间计算结果,减少了迭代运算的磁盘IO,并通过并行计算DAG图的优化,减少了不同任务之间的依赖,降低了延迟等待时间。
易用性:Spark提供了超过80种不同的算子,如map,reduce,filter,groupByKey,sortByKey,foreach等;Spark task以线程的方式维护,对于小数据集读取能够达到亚秒级的延迟。
通用性:Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。
兼容性:Spark能够跟很多开源工程兼容使用。如Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且Spark可以读取多种数据源,如HDFS、HBase、MySQL等。
Spark运行架构特点:
每个Application均有专属的Executor进程,并且该进程在Application运行期间一直驻留;Spark运行过程与资源管理器无关,只要能够获取Executor进程并保持通信即可;Task采用了数据本地性和推测执行等优化机制。spark本身是用Scala编写的,spark1.4.0 起支持R语言和Python3编程。
通过spark-shell进入Spark交互式环境,使用Scala语言;通过spark-submit提交Spark应用程序进行批处理;该方法可以提交Scala或Java语言编写的代码编译后生成的jar包,也可以直接提交Python脚本。通过pyspark进入pyspark交互式环境,使用Python语言;该方式可以指定jupyter或者ipython为交互环境。通过zepplin notebook交互式执行;zepplin在jupyter notebook里。安装Apache Toree-Scala内核,可以在jupyter 中运行spark-shell。使用spark-shell运行时,可以添加两个常用的两个参数:master指定使用何种分布类型;jars指定依赖的jar包。Spark SQL 是从shark发展而来。
Spark SQL在Hive兼容层面仅依赖HQL parser、Hive Metastore和Hive SerDe(后两者用于兼容Hive存储格式)。
从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管。执行计划生成和优化都由Catalyst负责,借助Scala的模式匹配等函数式语言特性,利用Catalyst开发执行计划优化策略比Hive要简洁得多。默认的是parquet,可以通过spark.sql.sources.default,修改默认配置。
Spark SQL可支持多种数据格式:json, parquet, jdbc, orc, libsvm, csv, text;
其中hive表使用:
spark 1.6及以前的版本需要hivecontext;Spark2开始只需要创建sparksession增加enableHiveSupport()即可。Spark SQL仅仅会缓存必要的列,并且自动调整压缩算法来减少内存和GC压力。
对于一条SQL语句中可能多次使用到的表,可以对其进行缓存,使用SQLContext.cacheTable(tableName),或者DataFrame.cache()即可。SparkSQL会用内存列存储的格式进行表的缓存。然后SparkSQL就可以仅仅扫描需要使用的列,并且自动优化压缩,来最小化内存使用和GC开销。SQLContext.uncacheTable(tableName)可以将表从缓存中移除。用SQLContext.setConf(),设置spark.sql.inMemoryColumnarStorage.batchSize参数(默认10000),可以配置列存储的单位。
在任务超多,广播变量在跨stage使用数据的时候才能凸显其真正作用。
对于分布式shuffle操作像reduceByKey和join,父RDD中分区的最大数目。
对于无父RDD的并行化等操作,它取决于群集管理器:
-本地模式:本地计算机上的核心数
-Mesos fine grained mode:8
-其他:所有执行节点上的核心总数或2,以较大者为准
分布式shuffle操作的分区数在实际测试中,设置Shuffle过程中的并行度spark.sql.shuffle.partitions是对sparks SQL的专用设置;spark.default.parallelism只有在处理RDD时才会起作用,对Spark SQL的无效。
spark.sql.files.maxPartitionBytes该值的调整要结合你想要的并发度及内存的大小来进行;spark.sql.files.openCostInBytes说直白一些这个参数就是合并小文件的阈值,小于这个阈值的文件将会合并。
文件格式建议使用parquet或者orc;parquet已经可以达到较大性能。
不必要的情况下,关闭分区字段类型自动推导:
spark.sql.tungsten.enabled,默认是true,自动管理内存。
例如Dataframe使用foreachPartitions将数据写入数据库,不要每个record都去拿一次数据库连接。通常写法是每个partition拿一次数据库连接。
/** * 将统计结果写入MySQL中 * 代码优化: * 在进行数据库操作的时候,不要每个record都去操作一次数据库 * 通常写法是每个partition操作一次数据库 **/ try { videoLogTopNDF.foreachPartition(partitionOfRecords => { val list = new ListBuffer[DayVideoAccessStat] partitionOfRecords.foreach(info => { val day = info.getAs[String]("day") val cmsId = info.getAs[Long]("cmsId") val times = info.getAs[Long]("times") list.append(DayVideoAccessStat(day, cmsId, times)) }) StatDao.insertDayVideoTopN(list) }) }catch{ case e:Exception =>e.printStackTrace() }在Hive数据仓库建设过程中,合理设置数据类型,比如能设置为IINT的,就不要设置成BIGINT,减少数据类型导致的不必要的内存开销。
编写SQL是,尽量给出明确的列明,比如select name from students。不要写成select * from students。
对SparkSQL查询的结果,如果数据量比较大,比如超过了1000条,不要一次性collect()到Driver再处理。使用foreach()算子,并行处理查询结果。
spark.sql.autoBroadcastJoinTreshold,默认10485760(10MB)。再内存够用的情况下,减小其大小,可以将join中的较小的表广播出去,而不用进行网络数据输出。注意:大小不超过设置值的表,都会被广播出去。最有效的,其实就是并行处理查询结果、缓存表和广播join表。
三个统计方法都是只要当天的视频数据,所以在调用方法前过滤出当天视频数据,缓存到内存中。
然后传到三个统计方法中使用。
不要在每个统计方法都去做一次相同的过滤。
val logDF = spark.read.format("parquet") .load("file:///F:\\mc\\SparkSQL\\data\\afterclean") val day = "20170511" /** * 代码优化:复用已有数据 * 既然每次统计都是统计的当天的视频, * 先把该数据拿出来,然后直接传到每个具体的统计方法中 * 不要在每个具体的统计方法中都执行一次同样的过滤 * * 用$列名得到列值,需要隐式转换 import spark.implicits._ * */ import spark.implicits._ val dayVideoDF = logDF.filter($"day" ===day&&$"cmsType"==="video") /** * 将这个在后文中会复用多次的dataframe缓存到内存中 * 这样后文在复用的时候会快很多 * * default storage level (`MEMORY_AND_DISK`). * */ dayVideoDF.cache() //logDF.printSchema() //logDF.show() StatDao.deletaDataByDay(day) //统计每天最受欢迎(访问次数)的TopN视频产品 videoAccessTopNStatDFAPI(spark,dayVideoDF) //按照地势统计每天最受欢迎(访问次数)TopN视频产品 每个地市只要最后欢迎的前三个 cityAccessTopNStat(spark,dayVideoDF) //统计每天最受欢迎(流量)TopN视频产品 videoTrafficsTopNStat(spark,dayVideoDF) //清除缓存 dayVideoDF.unpersist(true)注:Spark官方推荐使用spark.ml,并预期于3.0版本移除spark.mllib。使用 ML Pipeline API可以很方便的把数据处理,特征转换,正则化,以及多个机器学习算法联合起来,构建一个单一完整的机器学习流水线。
推荐阅读:https://zhuanlan.zhihu.com/p/85612466
存储在单机上,拥有整型、从0开始的索引值以及浮点型的元素值。
其中稠密向量(DenseVector)使用一个双精度浮点型数组来表示其中每一维元素,而稀疏向量(SparseVector)则是基于一个整型索引数组和一个双精度浮点型的值数组。以向量(1.0, 0.0, 3.0)为例:稠密向量表达形式为[1.0, 0.0, 3.0];稀疏向量形式为(3, [0, 2], [1.0, 3.0]),其中3是向量长度,[0, 2]是向量中非0维度的索引值,[1.0, 3.0]是按索引排列的数组元素值。一种带有标签(Label/Response)的本地向量,它可以是稠密或者是稀疏的。
标注点仅在监督学习算法中使用,由于标签是用双精度浮点型来存储的,故标注点类型在回归(Regression)和分类(Classification)问题上均可使用。以二分类为例,正样本的标签为1,负样本的标签为0;而多分类标签则是一个以0开始的索引序列,如:0, 1, 2 ...存储在单机上,具有整型的行、列索引值和双精度浮点型的元素值。其中稠密矩阵将所有元素的值存储在一个列优先(Column-major)的双精度型数组中,而稀疏矩阵则将非零元素以列优先的CSC(Compressed Sparse Column)模式进行存储。
由长整型的行列索引值和双精度浮点型的元素值组成,可以分布式地存储在一个或多个RDD上。
1)行矩阵(Row Matrix)是最基础的分布式矩阵类型。每行是一个本地向量,行索引无实际意义(即无法直接使用)。数据存储在一个由行组成的RDD中,其中每一行都使用一个本地向量来进行存储。由于行是通过本地向量来实现的,故列数(即行的维度)被限制在普通整型。在实际应用中,由于单机处理本地向量的存储和通信代价,行维度更是需要被控制在一个更小的范围之内。
2)索引行矩阵(Index Row Matrix):与行矩阵相似,但它的每一行都带有一个有意义的行索引值,这个索引值可以被用来识别不同行,其数据存储在一个由IndexRow组成的RDD里,即每一行都是一个带长整型索引的本地向量。
3)坐标矩阵(Coordinate Matrix):一个基于矩阵项构成的RDD的分布式矩阵每一个矩阵项(MatrixEntry)都是一个三元组(i: Long, j: Long, value: Double),其中i是行索引,j是列索引,value是该位置的值。坐标矩阵一般在矩阵的两个维度都很大,且矩阵非常稀疏的时候使用。
4)分块矩阵(Block Matrix):分块矩阵将矩阵分成一系列矩阵块,底层由矩阵块构成的RDD来进行数据存储,每一个矩阵块都是一个元组((Int, Int), Matrix),其中(Int, Int)是块的索引,Matrix是在对应位置的子矩阵(sub-matrix),其尺寸由rowsPerBlock和colsPerBlock决定,默认值均为1024。
分块矩阵之间可以进行加法操作和乘法操作,并使用方法validate()来确认分块矩阵是否创建成功。分块矩阵可由索引行矩阵(IndexedRowMatrix)或坐标矩阵(CoordinateMatrix)调用toBlockMatrix(rowsPerBlock, colsPerBlock)方法来进行转换,该方法将矩阵划分成尺寸默认为1024x1024的分块,可以在调用该方法时传入参数来调整分块的尺寸。分块矩阵用于生成分布式矩阵的底层RDD必须是已经确定(Deterministic)的,因为矩阵的尺寸将被存储下来,所以使用未确定的RDD将会导致错误。而且,不同类型的分布式矩阵之间的转换需要进行一个全局的shuffle操作,非常耗费资源。所以,根据数据本身的性质和应用需求来选取恰当的分布式矩阵存储类型是非常重要的。工作原理介绍:https://zhuanlan.zhihu.com/p/33619687
离线安装包下载地址:https://pypi.org/project/pyspark/#history
PySpark安装:https://www.cnblogs.com/liuyutan/p/13289880.html
PySpark入门:https://zhuanlan.zhihu.com/p/85612466
随机森林实战:http://www.doc88.com/p-8436427224488.html
逻辑回归实战:https://blog.csdn.net/baymax_007/article/details/82428984?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromBaidu-2.add_param_isCf&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromBaidu-2.add_param_isCf