Spark葵花宝典:一小时速成Spark

    技术2023-12-30  74

    Spark简介

    什么是Spark?

    Spark是一个快速、分布式、可扩展(随时可以进行节点的扩充)、容错(节点宕机了。那么它可以重新构建恢复这个数据)的集群计算框架。低延迟的复杂分析,因为Spark的低延迟,延迟低是因为Spark是在内存里面计算的。(Spark已经成为Apache软件基金会旗下的顶级开源项目)

    为什么要使用 Spark 替换 MapReduce?

    MapReudce不适合迭代和交互式任务,Spark主要为交互式查询和迭代算法设计,支持内存存储和高效的容错恢复。Spark拥有MapReduce具有的优点,但不同于MapReduce,Spark中间输出结果可以保存在内存中,减少读写HDFS的次数。

    Spark特点:

    快速:因为 Spark 是在内存里面计算的易用性:Spark 支持使用Scala、Python、Java及R语言快速编写应用。通用性:Spark可以与SQL、Streaming及复杂的分析良好结合。随处运行:

    Spark的生态圈:(了解)

    Spark Core:Spark核心,提供底层框架及核心支持。包含Spark的基本功能,包括任务调度、内存管理、容错机制等。Spark Core内部定义了RDDS,并提供了很多API来创建和操作RDDTachyon:Tachyon是一个分布式内存文件系统,可以理解为内存中的HDFS。BlinkDB:一个用于在海量数据上运行交互式SQL查询的大规模并行查询引擎,它允许用户通过权衡数据精度来提升查询响应时间,其数据的精度被控制在允许的误差范围内。Spark SQL:可以执行SQL查询,包括基本的SQL语法和HiveQL语法。读取的数据源包括Hive表、Parquent文件、JSON数据、关系数据库(如MySQL)等。Spark Streaming:流式计算。比如,一个网站的流量是每时每刻都在发生的,如果需要知道过去15分钟或一个小时的流量,则可以使用Spark Streaming来解决这个问题。MLBase:MLBase是Spark生态圈的一部分,专注于机器学习,让机器学习的门槛更低,让一些可能并不了解机器学习的用户也能方便地使用MLBase。MLBase分为四部分:MLlib、MLI、ML Optimizer和MLRuntime。MLlib:MLBase的一部分,MLlib是Spark的数据挖掘算法库,实现了一些常见的机器学习算法和实用程序,包括分类、回归、聚类、协同过滤、降维以及底层优化。GraphX:图计算的应用在很多情况下处理的数据都是很庞大的,比如在移动社交上面的关系等都可以用图相关算法来进行处理和挖掘,但是如果用户要自行编写相关的图计算算法,并且要在集群中应用,那么难度是非常大的。而使用Spark GraphX就可以解决这个问题,它里面内置了很多的图相关算法。SparkR:SparkR是AMPLab发布的一个R开发包,使得R摆脱单机运行的命运,可以作为Spark的Job运行在集群上,极大地扩展了R的数据处理能力。

    Scala编程基础

    什么是Scala?

    Scala是一门多范式的编程语言,设计初衷是要集成面向对象编程和函数式编程的各种特性。因此Scala是一种纯面向对象的语言,每个值都是对象。同时Scala也是一种函数式编程语言,其函数也能当成值来使用。由于Scala整合了面向对象语言和函数式编程的特性,Scala相对于Java、C#、C++等其他语言更加简洁。Scala源代码被编译成Java字节码,所以它可以运行于JVM之上,并可以调用现有的Java类库。Scala一开始就打算基于Java的生态系统发展自身,而这令Scala受益匪浅。

    Scala特点:

    面向对象:把相关的数据和方法组织为一个整体来看待,从更高的层次来进行系统建模,更贴近事物的自然运行模式。函数式编程:函数是一等公民,允许自定义控制语句、改造语言。静态类型:编译时进行检查。可扩展的: 隐式类: 允许给已有的类型添加扩展方法字符串插值: 可以让用户使用自定义的插值器进行扩展

    Scala的编程模式:

    交互式编程:REPL(Read Eval Print Loop:交互式解释器) 读取:读取用户输入,解析输入的内容。执行:执行输入的scala代码打印:输出结果循环 :循环操作以上步骤直到用户确认退出。 脚本编程:使用文本编辑器创建一个 .scala 的文件来执行代码 使用 scalac 命令编译使用scala命令执行程序

    数据类型

    基本数据类型:

    数据类型描述Byte8位有符号补码整数。数值区间为 -128 到 127Short16位有符号补码整数。数值区间为 -32768 到 32767Int32位有符号补码整数。数值区间为 -2147483648 到 2147483647Long64位有符号补码整数。数值区间为 -9223372036854775808 到 9223372036854775807Float32 位, IEEE 754 标准的单精度浮点数Double64 位 IEEE 754 标准的双精度浮点数Char16位无符号Unicode字符, 区间值为 U+0000 到 U+FFFFString字符序列Booleantrue或falseUnit表示无值,和其他语言中void等同。用作不返回任何结果的方法的结果类型。Unit只有一个实例值,写成()。Nullnull 或空引用NothingNothing类型在Scala的类层级的最底端;它是任何其他类型的子类型。AnyAny是所有其他类的超类AnyRefAnyRef类是Scala里所有引用类(reference class)的基类

    常用的基本数据类型:

    Int:整型数值,这意味着它只能保存整数。

    Float与Double:都是表示浮点数,浮点数后面有f或者F后缀时,表示这是一个Float类型,否则就是一个Double类型的。

    String:字符串,其用法是用双引号包含一系列字符。

    Boolean:只能保存两个特殊的值,ture和flase。

    Unit:在Java中创建一个方法时经常用void表示该方法无返回值,而Scala中没有void关键字,Scala中用Unit表示无值,等同于Java中的void。

    数据类型中还有一个:

    Range类型:是一种数据范围或序列, 支持Range的类型包括Int、Long、Float、Double、Char、BigInt和BigDecimal。

    变量(var)和常量(val):

    var:可以在它的声明周期中被多次赋值

    val:一旦初始化了,val就不能再被赋值

    注意:val 对并发或分布式编程很有好处

    判断

    if (表达式) { //代码块 } else if(表达式){ //代码块 }else{ //代码块 }

    数组

    定长数组

    定义定长数组的两种方式:

    val arr = new Array[Int](5)val arr = Array(1,2,3,4,5)

    操作定长数组:

    arr.length:返回数组长度arr.head:返回数组第一个元素arr.tail:返回除了第一个元素arr.isEmpty:判断数组是否为空arr.contains(数据):判断数组是否包含某个元素

    变长数组

    数组缓冲ArrayBuffer

    定义变长数组之前需要先导入包:

    import scala.collection.mutable.ArrayBuffer

    定义变长数组:

    val bufArr = new ArrayBuffer[Int]()

    操作变长数组:定长数组的操作均可以使用

    bufArr += 1:在末尾添加元素

    bufArr.trimEnd(n):移除最后的n个元素

    bufArr.insert(n-1,x):中间插入数组元素(第n个元素前插入x元素)。

    bufArr.insert(n-1,x,y,z):插入多个元素

    bufArr.remove(n-1):删除数组第n个元素

    bufArr.toArray:变成定长数组

    数组其他常用对的方法:

    需要导入包:import Array._

    concat(arr1,arr2):合并数组 var arr1 = Array(1,2) var arr2 = Array(3,4) var arr3 = concat(arr1,arr2) fill(len, data):返回数组,长度为第一个参数指定,同时每个元素使用第二个参数进行填充。 val arr:Array[Int]=fill(3)(2) ofDim(len):创建指定长度的数组,或创建二维数组 val arr1:Array[Int]=ofDim(3) val arr2:Array[Array[Int]]=ofDim(3,3) //二维数组 range(start,end,step):创建指定区间内的数组,step 为每个元素间的步长 val arr3:Array[Int]=range(1,10,2) val arr4:Array[Int]=range(1,10)

    连接数组:连接两个数组既可以使用操作符++,也可以使用concat方法。但是使用concat方法需要先使用import Array._引入包。

    创建多维数组:

    var arrInt = Array(Array(2,3),Array(5,6))

    方法和函数

    方法和函数:二者在语义上的区别很小。Scala 方法是类的一部分,而函数是一个对象可以赋值给一个变量。换句话来说在类中定义的函数即是方法。

    使用 val 语句可以定义函数,def 语句定义方法。

    方法

    方法的定义方式:

    object Test { //方法 def addInt( a:Int, b:Int) :Int = { var sum:Int =0 sum = a + b return sum } } //调用方法形式1:带类名调用 Test.addInt(1,2)

    //既不带参数也不返回结果 def printMe() : Unit = { println("Hello, Scala!") } //调用方法形式2:直接调用 printMe()

    方法的参数:

    可变参数:Scala允许方法的最后一个参数可以是重复的,即不需要指定参数的个数,可以向方法传入可变长度参数列表。 def printStrings( args:String*) = { var i :Int = 0; for (arg <- args){ println("arg value[" + i + "] = " + arg); i += 1; } } printStrings("abc", "cd") 默认值参数:在定义方法的过程中可以给参数赋默认值。这种情况下,如果调用方法时不传入参数则使用默认值,如果传入参数则根据实际参数值进行计算。 def addInt ( a:Int=5, b:Int=7) :Int = { var sum:Int = 0 sum = a+b return sum }

    函数

    函数:函数是一个对象可以赋值给一个变量,使用val进行定义。函数可作为一个参数传入到方法中,而方法不行。

    函数的定义:

    val f = (x:Int,y:Int) => { x + y x + y + 10 }

    循环

    while循环

    var a = 10; while( a < 20 ){ println( "Value of a: " + a ); a = a + 1; }

    注意:++i和i++在Scala里不起作用,要在得到同样效果,必须要么写成i=i+1,要么写成i+=1

    do while循环

    var a = 10; do{ println( "Value of a: " + a ); a = a + 1; }while( a < 20 )

    for循环

    var a = 0; for( a <- 1 to 10){ println( "Value of a: " + a ); } //----等同于 for( a <- 1 until 10){ println( "Value of a: " + a ); }

    注意:左箭头 <- 用于为变量 a 赋值。

    for循环和数组:

    var a = 0; val numList = Array(1,2,3,4,5,6); for( a <- numList ){ println( "Value of a: " + a ); }

    for循环和集合

    var a = 0; val numList = List(1,2,3,4,5,6); for( a <- numList ){ println( "Value of a: " + a ); }

    for循环和过滤

    语法:

    for( var x <- List if condition1; if condition2... ){ statement(s); }

    示例:

    var a = 0; val numList = List(1,2,3,4,5,6,7,8,9,10); for( a <- numList if a != 3; if a < 8 ){ println( "Value of a: " + a ); }

    for循环和yield

    语法:将 for 循环的返回值作为一个变量存储

    var retVal = for{ var x <- List if condition1; if condition2... }yield x

    示例:

    var a = 0; val numList = List(1,2,3,4,5,6,7,8,9,10); var retVal = for{ a <- numList if a != 3; if a < 8 }yield a for( a <- retVal){ println( "Value of a: " + a ); }

    foreach方法

    val arr = Array(1,2,3,4) arr.foreach((x:Int) => println(x)) //输出结果: 1,2,3,4

    集合

    Scala 集合分为可变的和不可变的集合。

    可变集合:可以在适当的地方被更新或扩展。这意味着你可以修改,添加,移除一个集合的元素;

    不可变集合:相比之下,永远不会改变。不过,你仍然可以模拟添加,移除或更新操作。但是这些操作将在每一种情况下都返回一个新的集合,同时使原来的集合不发生改变;如List

    Collection : List、Set、Map、Tuple

    List列表

    定义列表:

    方式1:

    val list:List[String] = List("baidu", "google")

    方式2:Nil 代表一个空,最后一个必须为Nil ,否则会报错

    val list:List[Int] = 1::2::3::Nil

    链接列表:

    ::: 运算符是将集合中的每一个元素加入到集合中去,并且左右两边都必须是集合. val list1 = "Baidu"::("google"::("bing"::Nil)) val list2 = "Facebook"::"taoboa"::Nil val list3 = list1:::list2 使用List.concat(list1, list2) val list1 = "Baidu"::("google"::("bing"::Nil)) val list2 = "Facebook"::"taoboa"::Nil val list4 = List.concat(list1,list2)

    操作列表:

    :::list1.::("baidu")::::list1.:::(("baidu"::Nil))head:获取列表的第一个元素init:返回所有元素,除了最后一个drop(n):丢弃前n个元素,并返回新列表filter:输出符号指定条件的所有元素。 val num:List[Int] = List(1,2,3,4,5) val list = num.filter(x => x%2==0) //List(2, 4) map:通过给定的方法将所有元素重新计算 val num:List[Int] = List(1,2,3,4,5) val list = num.map(x => x*2) //List(2, 4, 6, 8, 10) mkString:使用分隔符将列表所有元素作为字符串显示 val num:List[Int] = List(1,2,3,4,5) val list = num.mkString(",") //1,2,3,4,5 foreach:将函数应用到列表的所有元素 val num:List[Int] = List(1,2,3,4,5) val list = num.foreach(x=>println(x)) //函数 //---代码块--- val list = num.foreach(x=>{ println(x) })

    ListBuffer可变列表

    使用可变列表首先要导入:

    import scala.collection.mutable.ListBuffer

    定义可变列表:

    val list = ListBuffer[Int](1, 2, 3)

    操作可变列表:

    添加元素 +=:list += 4append(data):list.append(4) 删除元素 remove(index):list.remove(0)删除下表为0的元素

    Set集合

    默认情况下,Scala 使用的是不可变集合,如果你想使用可变集合,需要引用 scala.collection.mutable.Set 包。Set是没有重复的对象集合,所有的元素都是唯一的。 定义集合:

    val set = Set(1,2,3,3) // 元素唯一性 val set = Set(1,2,3) //二者结果相同

    操作集合:

    head:返回集合第一个元素tail:返回一个集合,包含除了第一元素之外的其他元素isEmpty:判断是否是一个空数组foreach:遍历集合 val set = Set(1,2,3,3) set.foreach(x => { println(x)} ) 添加元素(前提是引入了上面的包变成可变集合) add(data)+= 删除元素(前提是引入了上面的包变成可变集合) remove(data)-= 链接两个集合 ++set1.++(set2) val set1 = Set(1,2,3,4) val set2 = Set(3,4,5,6) val set3 = set1 ++ set2 val set4 = set1.++(set2) 查看两个集合的交集元素 set1.&(set2)set.intersect(set2) val set1 = Set(1,2,3,4) val set2 = Set(3,4,5,6) val set5 = set1.&(set2) val set6 = set1.intersect(set2) 查找集合中最大与最小元素 set.maxset.min

    Map映射

    Map(映射)是一种可迭代的键值对(key/value)结构。所有的值都可以通过键来获取。

    同样的 Map 有两种类型,可变与不可变,区别在于可变对象可以修改它,而不可变对象不可以。

    默认情况下 Scala 使用不可变 Map。如果你需要使用可变集合,你需要显式的引入

    import scala.collection.mutable.Map

    定义 Map 映射:

    val map1 = Map("a" -> 1, "b" ->2, "c" ->3)

    操作 Map 映射:

    keys

    values

    isEmpty

    foreach

    val map = Map("a" -> 1, "b" ->2, "c" ->3) println(map.keys) //Set(a, b, c) println(map.values) //MapLike.DefaultValuesIterable(1, 2, 3) println(map.isEmpty) //false map.foreach(x=>println(x)) 添加映射 += val map1 = Map("a" -> 1, "b" ->2, "c" ->3) map1 += ("d" -> 4) var map = Map[String,Int]("a" -> 1,"b" -> 2) //引用可变,支持读写操作; map += ("c" -> 3) //新增 println(map) val map2 = Map[String,Int]("a" -> 1,"b" -> 2) //引用不可变,只能第一次写入值,之后只能读取; map2 += ("c" -> 3) //此时不能加,直接报错; val map3 = scala.collection.mutable.Map[String,String]() //引用不可变,支持读写操作; Map3 += (“c” -> 3) 链接两个映射:如果 Map 中存在相同的 key,合并后的 Map 中的 value 会被最右边的 Map 的值所代替。 ++map.++() val map1 = Map("a" -> 1, "b" ->2, "c" ->3) val map2 = Map("c" -> 4, "d" ->5) val map3 = map1 ++ map2 val map4 = map1.++(map2)

    Tuple元组

    与列表一样,元组也是不可变的,但与列表不同的是元组可以包含不同类型的元素。

    Scala 支持的元组最大长度为 22。

    定义 Tuple 元组:元组的值是通过将单个的值包含在圆括号中构成的。

    val t = (1, 3.14, "Fred") val t = new Tuple3(1,2,"string")

    使用t._1 访问第一个元素, t._2 访问第二个元素:

    val t = (1, 3.14, "Fred") println(t._1) //1 println(t._2) //3.14 println(t._3) //"Fred"

    补充:Scala Option(选项)类型用来表示一个值是可选的(有值或无值)。

    迭代器

    Scala Iterator(迭代器)不是一个集合,它是一种用于访问集合的方法。迭代器的两个基本操作是 next 和 hasNext。

    val map = Map("a" -> 1, "b" ->2, "c" ->3) val iterator = map.iterator while (iterator.hasNext){ val ele = iterator.next() println("key:" + ele._1 + ";value:" + ele._2) }

    函数组合器

    map:通过一个函数重新计算列表中所有元素,并且返回一个相同数目元素的新列表。

    foreach:foreach没有返回值,foreach只是为了对参数进行作用。

    filter:过滤移除使得传入的函数的返回值为false的元素。

    val num:List[Int] = List(1,2,3,4,5) num.map(x => x*2) num.foreach(x => println(x*x + "\t")) num.filter(x => x%2 ==0) flatten:把嵌套的结构展开,或者说flatten可以把一个二维的列表展开成一个一维的列表 val list = List(List(1,2,3), List(3,4,5)) var newlist = list.flatten //List(1, 2, 3, 3, 4, 5) flatMap:结合了map和flatten的功能,接收一个可以处理嵌套列表的函数,然后把返回结果连接起来。 val list:List[List[Int]] = List(List(1,2,3), List(3,4,5)) var newlist = list.flatMap(x => x.map(_*2)) //List(2, 4, 6, 6, 8, 10) groupBy:对集合中的元素进行分组操作,结果得到的是一个Map。 val intList:List[Int] = List(1,2,3,4,5,6) var newlist = intList.groupBy(x=>x%2==0) //结果: Map(false -> List(1, 3, 5), true -> List(2, 4, 6))

    访问修饰符

    没有指定访问修饰符,默认情况下,访问级别都是 public

    私有成员(Private):在嵌套类情况下,外层类甚至不能访问被嵌套类的私有成员。 class Outer{ class Inner{ private def f(){println("f")} class InnerMost{ f() } } (new Inner).f() //error } 保护成员(Protected):Protected只允许保护成员在定义了该成员的的类的子类中被访问。公共成员(Public):在任何地方都可以被访问。 class Outer { class Inner { def f() { println("f") } class InnerMost { f() // correct } } (new Inner).f() // correct }

    Scala是一种纯粹的面向对象语言,两个重要的概念:类和对象。类是对象的抽象,也可以把类理解为模板,对象才是真正的实体。Scala中的类不声明为public。Scala 的类定义可以有参数,称为类参数;类参数在整个类中都可以访问。

    类的使用:

    class Point(xc: Int, yc: Int) { var x: Int = xc var y: Int = yc def move(dx: Int, dy: Int) { x = x + dx y = y + dy println ("x : " + x); println ("y : " + y); } }

    一个Scala源文件中可以有多个类。可以使用 new 来实例化对象,并访问类中的方法和变量

    //scala文件 import java.io._ class Point(xc: Int, yc: Int) { var x: Int = xc var y: Int = yc def move(dx: Int, dy: Int) { x = x + dx y = y + dy println ("x 的坐标点: " + x); println ("y 的坐标点: " + y); } } object Test { def main(args: Array[String]) { val pt = new Point(10, 20); // 移到一个新的位置 pt.move(10, 10); } }

    类的继承:Scala 使用 extends 关键字来继承一个类,子类会继承父类的所有属性和方法,Scala 只允许继承一个父类。

    Scala继承一个基类跟Java很相似, 但我们需要注意以下几点:

    1、重写一个非抽象方法必须使用override修饰符。

    2、只有主构造函数才可以往基类的构造函数里写参数。

    3、在子类中重写超类的抽象方法时,你不需要使用override关键字

    class Location(val xc: Int, val yc: Int, val zc :Int) extends Point(xc, yc){ var z: Int = zc override def move(dx: Int, dy: Int) { x = x + dx + 100 y = y + dy + 100 println ("x location : " + x); println ("y location : " + y); } def move(dx: Int, dy: Int, dz: Int) { x = x + dx y = y + dy z = z + dz println ("x : " + x); println ("y : " + y); println ("z : " + z); } } object Test { def main(args: Array[String]) { val loc = new Location(10, 20, 15); loc.move(10, 10); } }

    子类继承父类中已经实现的方法需要使用关键字override,子类继承父类中未实现的方法可以不用override关键字。

    //Cat Animal 父子类方法的覆盖 //使用eat方法,使用{}包容代码,在方法里使用println abstract class Animal { def showName(str:String)={println("animal")} def eat(food:String) } class Cat extends Animal { override def showName(str:String)={println("cat")} //子类继承父类中已经实现的方法需要使用关键字`override` def eat(food:String) = {println("fish")}//子类继承父类中未实现的方法可以不用`override`关键字。 }

    单例对象:在整个程序中只有这么一个实例。Scala中没有static关键字,因此Scala的类中不存在静态成员。 Scala中使用单例模式时需要使用object定义一个单例对象。object 对象与类的区别在于object 对象不能带参数。包含 main 方法的 object 对象可以作为程序的入口点。

    伴生对象:是一种特殊的单例对象,是一种相对概念。需要两个条件:

    在同一个源文件中

    对象名和类名相同

    类和伴生对象之间可以相互访问私有的方法和属性。

    构造器

    构造器分为两类:主构造器(只有一个),辅助构造器(有多个)

    主构造器直接在类名后面定义,每个类都有主构造器,主构造器的参数直接放在类名后,与类交织在一起 如果没有定义构造器,类会有一个默认的空参构造器

    class Point(xc: Int, yc: Int) {//主构造器 var x: Int = xc var y: Int = yc def move(dx: Int, dy: Int) { x = x + dx y = y + dy println ("x 的坐标点: " + x); println ("y 的坐标点: " + y); } }

    辅助构造器定义,使用def this,必须调用主构造器,或者其他构造器。

    class Person { //空参构造器 var name:String = "Tom" var sex:String = "man" val age:Int =18 println("main constructor") def this(name:String){ //辅助构造器 this() println("name constructor") } def this(name:String, sex:String) { this(name) println("name sex construtor") } }

    注意:

    有两类构造器,主构造器和辅构造器

    辅构造器的参数不能和主构造器的参数完全一致(参数个数、类型、顺序)

    可以定义空参的辅助构造器,但是主构造器参数必须进行初始化赋值

    辅构造器的作用域只在方法中,主构造器的作用域是类中除去成员属性和成员方法外的所有范围

    IDE中的Spark程序

    SparkContext 介绍

    ​ 任何Spark程序都是以SparkContext对象开始的,因为SparkContext是Spark应用程序的上下文和入口,都是通过SparkContext对象的实例来创建RDD。因此在实际Spark应用程序的开发中,在main方法中需要创建SparkContext对象,作为Spark应用程序的入口,并在Spark程序结束时关闭SparkContext对象。

    SparkContext初始化

    通过SparkConf对象设置集群配置的各种参数。

    val conf = new SparkConf().setMaster("local").setAppName("appName") var sc = new SparkContext(conf)

    spark分布式存储

    为什么使用分布式存储?| 分布式存储特点:

    数据分块存储在多台机器上每一数据块都可以冗余存储在多台机器上,以提高数据块的高可用性

    如何管理分布式存储

    在另外一台机器上启动一个管理所有节点以及存储在上面数据块的服务

    文件的概念:

    1: 存在于slave上的文件:表示真实存放数据的文件, 即本地磁盘文件

    2: 存在于master上的文件:表示逻辑文件,它表示这个逻辑

    分布式存储系统HDFS常见linux命令:

    查看hdfs文件系统的方式: hadoop fs –ls /user/hadoop-twq

    上传文件: hadoop fs -copyFromLocal word.txt /users/hadoop-twq

    下载文件: hadoop fs -get /users/hadoop-twq/word.txt

    设置数据块的备份数:hadoop fs -setrep 2 /users/hadoop-twq/word.txt

    删除文件的方式:hadoop fs -rm /users/hadoop-twq/word.txt

    Spark分布式计算概述:

    简介:在每一个block所在的机器针对block数据进行计算,将结果汇总到计算master

    原则: 移动计算而尽可能少的移动数据

    含义: 其实就是将单台机器上的计算扩展到多台机器上进行并行计算

    特点:就近计算,计算分摊到每个节点,基于性质相同的每个数据块,同时使用相同方法来计算。

    基本结构:计算主节点和资源主节点

    数据的分区方式是什么样的?

    文件的每一个block就是一个分区,当然我们也可以设置成2个block一个分区,对于key-value类型的数据的分区。我们可以根据key按照某种规则来进行分区,比如按照key的hash值来分区。

    在计算伊始读取分区数据的时候,会发生从其他机器节点通过网络传输读取数据吗?

    可能会发生,但是需要尽量避免,我们需要遵循移动计算而不移动数据的原则。每一个数据块都包含了它所在的机器的信息,我们需要根据这个数据块所在的机器,然后将计算任务发送到对应的机器上来执行,这个就是计算任务的本地性

    每一步出现的数据都是一份存储吗?

    不是,数据的存储只有一份,就是一开始的数据存储,在 shuffle 的时候会有中间临时数据的存储。

    MapReduce是基于磁盘的?是这样吗?

    MapReduce是基于磁盘的。

    Shuffle过程spark是基于内存的?

    不完全基于内存。spark的shuffle中间结果也是需要写文件的,只是对内存的利用比较充分而已。

    大数据中间结果数据的复用场景,存储方式?

    复用场景:一种是迭代式计算应用;一种是交互型数据挖掘应用

    存储方式:hadoop(磁盘)Spark(内存)

    Spark分布式内存的优点?

    将中间结果放到内存中,充分发挥内存的读写效率,避免重复性的工作一遍遍浪费CPU。

    Spark编程基础

    使用RDD的驱动力:

    快速:可伸缩性、位置感知调度、并行转换、数据分区

    准确:线性依赖、可序列化、失败自动重建、自动容错

    注意:RDD一旦创建就不可更改。重建不是从最开始的点来重建的,可以是上一步开始重建

    RDD定义: RDD叫做弹性分布式数据集 ,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。

    RDD特点:

    不可更改性:一旦创建就不可更改,保证数据稳定、可追溯性、容错性数据流模型特点:自动容错、位置感知性调度和可伸缩性。数据分区特性:可以通过数据的本地性来提高性能,这与Hadoop MapReduce是一样的;并行转换:并行方式来创建如(map、filter、join等);可序列化:在内存不足时可自动降级为磁盘存储,把RDD存储于磁盘上数据高度重用:RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续查询能够重用工作集,极大地提升查询速度。容错性:RDD只能从持久存储或通过Transformations操作产生,相比于分布式共享内存(DSM)可以更高效实现容错,对于丢失部分数据分区只需要根据它的lineage就可重新计算出来,而不需要做特定checkpoint;

    RDD两种算子

    转换操作(Transformations):Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。如:map、Filter、groupby、join等行动操作(Actions):Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计划的动因。如:count、collect、save等

    RDD的宽依赖与窄依赖的概念以及各自的方法?

    宽依赖:多个子RDD中的分区数据来源于同一个父RDD数据分区。例如groupByKey、reduceByKey、sortByKey等操作都会产生宽依赖

    窄依赖:父RDD的每个分区最多只被一个子RDD的一个分区使用。例如map、filter、union等操作都会产生窄依赖

    RDD的创建方式?

    通过并行集合(数组)创建RDD:可以调用 SparkContext 的 parallelize方法,在Driver中一个已经存在的集合(数组)上创建。

    从一个稳定的存储系统中创建:Spark采用textFile()方法来从文件系统中加载数据创建RDD,该方法把文件的URI作为参数,这个URI可以是:

    本地文件系统的地址或者是分布式文件系统HDFS的地址或者是Amazon S3的地址等等

    RDD的转换操作

    RDD的转换和行动操作都需要创建 SparkContext 执行上下文,下面示例中的 sc 来自这里:

    val conf = new SparkConf().setMaster("local").setAppName("appName") var sc = new SparkContext(conf) map():参数是函数,函数应用于RDD每一个元素,返回值是新的RDD val rdd_arr = sc.parallelize(Array("b", "a", "c")) val rdd_map = rdd_arr.map(x => (x,1)) //rdd.collect(): 将RDD类型的数据转化为数组 println(rdd_arr.collect().mkString(", ")) // b, a, c println(rdd_map.collect().mkString(", ")) //(b,1), (a,1), (c,1) flatMap():参数是函数,函数应用于RDD每一个元素,将元素数据进行拆分,变成迭代器,返回值是新的RDD //使用flatMap对多个集合中的每个元素进行操作再扁平化 val data = sc.parallelize(List("I am learning Spark", "I like Spark")) val res = data.flatMap(x => x.split(" ")) println(res.collect.mkString(", ")) //I, am, learning, Spark, I, like, Spark //对一个集合中的元素进行扩充 val arr = sc.parallelize(Array(1,2,3)) val newArr = arr.flatMap(n => Array(n, n*100, 42)) println(arr.collect().mkString(", ")) //1, 2, 3 println(newArr.collect().mkString(", ")) //1, 100, 42, 2, 200, 42, 3, 300, 42 filter():参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD val rdd_filter = sc.parallelize(Array(1,2,3)).filter(n => n>2) println(rdd_filter.collect().mkString(", ")) //3 distinct():没有参数,将RDD里的元素进行去重操作 val arr = sc.parallelize(Array(1,2,3,3,4)) val newArr = arr.distinct() println(newArr.collect().mkString(", ")) //4, 1, 3, 2 union():参数是RDD,生成包含两个RDD所有元素的新RDDintersection():参数是RDD,求出两个RDD的共同元素subtract():参数是RDD,将原RDD里和参数RDD里相同的元素去掉cartesian():参数是RDD,求两个RDD的笛卡儿积groupBy():做一个分组。 val arr = sc.parallelize(Array("John", "Fred", "Anna", "James")) val newArr = arr.groupBy(w => w.charAt(0)) println(newArr.collect().mkString(", ")) //(A,CompactBuffer(Anna)), (J,CompactBuffer(John, James)), (F,CompactBuffer(Fred))

    coalesce:收缩合并分区,减少分区的个数

    reduceByKey和groupByKey

    val words = Array("one", "two", "two", "three", "three", "three") val wordRDD = sc.parallelize(words).map(word => (word, 1)) val wordCountsWithReduce = wordRDD .reduceByKey(_ + _) .collect() val wordCountsWithGroup = wordRDD .groupByKey() .map(t => (t._1, t._2.sum)) .collect() println(wordCountsWithReduce.mkString(", ")) //(two,2), (one,1), (three,3) println(wordCountsWithGroup.mkString(", ")) //(two,2), (one,1), (three,3)

    reduceByKey和groupByKey区别?

    ​ reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义,当采用reduceByKey时,Spark可以在每个分区移动数据之前将待输出数据与一个共用的key结合

    ​ groupByKey也是对每个key进行操作,但只生成一个sequence,groupByKey本身不能自定义函数,需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数操作,当采用groupByKey时,由于它不接收函数,Spark只能先将所有的键值对(key-value pair)都移动,这样的后果是集群节点之间的开销很大,导致传输延时。

    RDD的行动操作

    collect(): 返回RDD所有元素 ,将RDD类型转换为数组类型count():RDD里元素个数countByValue():各元素在RDD中出现次数countByKey():各键在RDD中出现的次数 val arr = sc.parallelize(Array(('J',"James"),('F',"Fred"),('A',"Anna"),('J',"John"))) val keyCount = arr.countByKey() println(keyCount) //Map(A -> 1, J -> 2, F -> 1) reduce():并行整合所有RDD数据,例如求和操作 val arr = sc.parallelize(Array(1,2,3,4)) val res = arr.reduce((a,b) => a+b) println(arr.collect.mkString(", ")) //1, 2, 3, 4 println(res) //10 fold(0)(func):和reduce功能一样,不过fold带有初始值aggregate(0)(seqOp,combop):和reduce功能一样,但是返回的RDD数据类型和原RDD不一样foreach(func):对RDD每个元素都是使用特定函数saveAsTextFile(pathname):保存文件 rdd.saveAsTextFile("file:///f:/output-test.txt")

    RDD分区:RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上。

    使用RDD分区的优点:

    增加并行度减少通信开销

    能从spark分区中获取的操作有:cogroup()、groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey()以及lookup()

    RDD分区原则:使得分区的个数尽量等于集群中的CPU核心(core)数目

    如何手动设置分区:

    创建 RDD 时:在调用 textFile 和 parallelize 方法时候手动指定分区个数即可,语法格式:sc.textFile(path, partitionNum)

    通过转换操作得到新 RDD 时:直接调用 repartition 方法即可

    RDD惰性机制:执行“动作”类型操作时,才是真正的计算。

    RDD的持久化:为了避免这种重复计算的开销,可以使用persist()方法对一个RDD标记为持久化。

    键值对RDD

    概念:键值对RDD由一组组的键值对组成,这些RDD被称为PairRDD。PairRDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口,虽然大部分Spark的RDD操作都支持所有种类的单值RDD,但是有少部分特殊的操作只能作用于键值对类型的RDD。

    创建方式:从文件中加载,通过并行集合(数组)创建RDD

    键值对常用转换操作的方法及其作用:

    (1)reduceByKey(func):使用func函数合并具有相同键的值

    (2)groupByKey():对具有相同键的值进行分组

    (3)Keys():keys只会把Pair RDD中的key返回形成一个新的RDD

    (4)Values():只会把Pair RDD中的value返回形成一个新的RDD

    (5)sortByKey():返回一个根据键排序的RDD

    (6)mapValues(func):对键值对RDD中的每个value都应用一个函数,但是,key不会发生变化

    (7)join:内连接

    (8)combineByKey:聚合各分区的元素,而每个元素都是二元组

    单词计数程序案例:

    object WordCount { def main(args: Array[String]) { //定义文件路径 val inputFile = "file:///F:/words.txt" //配置集群参数 val conf = new SparkConf().setAppName("WordCount").setMaster("local[2]") val sc = new SparkContext(conf) //读取文件 val textFile = sc.textFile(inputFile) //平铺 --> (单词, 1) --> 求和 val wordCount = textFile .flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey((a, b) => a + b) //输出 wordCount.foreach(println) } }

    找出单科成绩为100的学生ID案例

    object grade { def main(args: Array[String]): Unit = { //定义SparkContext上下文 val conf = new SparkConf().setAppName("WordCount").setMaster("local[2]") val sc = new SparkContext(conf) //读取文件 val math = sc.textFile("file:///e:/spark/student/result_math.txt") val tupleMath = math.map { x => val lineData = x.split(" "); (lineData(0), lineData(1), lineData(2).toInt) } val bigData = sc.textFile("file:///e:/spark/student/result_bigdata.txt") val tupleBigdata = bigData.map { x => val lineData = x.split(" "); (lineData(0), lineData(1), lineData(2).toInt) } //获取成绩结果 val studentResult = tupleMath.filter(_._3 == 100).union(tupleBigdata.filter(_._3 == 100)) //去重 val finalResult = studentResult.map(_._1).distinct finalResult.collect } }
    Processed: 0.025, SQL: 9