scala总结 1.scala简介和SDK的安装 1.1(1)(初级)熟练使用scala编写Spark程序 (2)(中级)动手编写一个简易Spark通信框架 (3)(高级)为阅读Spark内核源码做准备 (4)Scala是一种多范式的编程语言,其设计的初衷是要集成面向对象编程和函数式编程的各种特性。Scala运行于Java平台(Java虚拟机)(JS平台),并兼容现有的Java程序。 1.2为什么要学Scala (1)优雅:这是框架设计师第一个要考虑的问题,框架的用户是应用开发程序员,API是否优雅直接影响用户体验。 (2)速度快:Scala语言表达能力强,一行代码抵得上Java多行,开发速度快;Scala是静态编译的,所以和JRuby,Groovy比起来速度会快很多。 (3)能融合到Hadoop生态圈:Hadoop现在是大数据事实标准,Spark并不是要取代Hadoop,而是要完善Hadoop生态。JVM语言大部分可能会想到Java,但Java做出来的API太丑,或者想实现一个优雅的API太费劲。 1.3Scala编译器安装 (1)安装JDK 因为Scala是运行在JVM平台上的,所以安装Scala之前要安装JDK (2)安装Scala (2.1)Windows安装Scala编译器 访问Scala官网http://www.scala-lang.org/下载Scala编译器安装包,目前最新版本是2.12.x,但是目前大多数的框架都是用2.11.x编写开发的,Spark2.x使用的就是2.11.x,所以这里推荐2.11.x版本,下载scala-2.11.8.msi后点击下一步就可以了 (2.2)Linux安装Scala编译器 下载Scala地址http://downloads.typesafe.com/scala/2.11.8/scala-2.11.8.tgz然后解压Scala到指定目录 tar -zxvf scala-2.11.8.tgz -C /usr/java 配置环境变量,将scala加入到PATH中 vi /etc/profile export JAVA_HOME=/usr/java/jdk1.8.0_111 export PATH= P A T H : PATH: PATH:JAVA_HOME/bin:/usr/java/scala-2.11.8/bin (2.3)Scala开发工具安装 目前Scala的开发工具主要有两种:Eclipse和IDEA,这两个开发工具都有相应的Scala插件,如果使用Eclipse,直接到Scala官网下载即可http://scala-ide.org/download/sdk.html。
由于IDEA的Scala插件更优秀,大多数Scala程序员都选择IDEA,可以到http://www.jetbrains.com/idea/download/下载社区免费版,点击下一步安装即可,安装时如果有网络可以选择在线安装Scala插件。这里我们使用离线安装Scala插件:
1.4安装IDEA,点击下一步即可。由于我们离线安装插件,所以点击Skip All and Set Defaul 1.5安装Scala插件:Configure -> Plugins -> Install plugin from disk -> 选择Scala插件 -> OK -> 重启IDEA 2.下载IEDA的scala插件,地址http://plugins.jetbrains.com/?idea_ce 1.6创建scala项目new project -> scala -> idea -> scala-SDK-2.11.12(scala SDK) -> SRC -> new -> scala class -> class,object(main),case… 2.2.变量的定义和使用
package cn._51doit.day01 object VariableTest { def main(args: Array[String]): Unit = { var abc = 125 var edf: Int = 125 var fgh = "helloworld" var ijk: String = "helloworld" var lmn = 10.0 var opq: Double = 10.0 println(edf) //定义可变的量 var i = 11 var j: Int = 12 var a = 25 var b: Int = 25 //定义不可变得量 val c = "bababamamamadididiyeyeye" val d: String = "bababamamamadididiyeyeye" val m = "hello" val n: Double = 10.0 println(n) } }3.3.for循环的使用
package cn._51doit.day01 object ForDemo { def main(args: Array[String]): Unit = { val string = "abcdefabcdef" for(b <- string){ println(b) } val arr125 = Array(1,2,3,4,5,6) for(a <- arr125){ println(a) } for(c <- arr125) println(c) for(d <- 0 to arr125.length - 1){ println(d) println(arr125(d)) } for(e <- 0 until arr125.length){ println(e) println(arr125(e)) } for(f <- arr125.indices){ println(f) println(arr125(f)) } val h = for(g <- arr125) yield g * 10 for(i <- h){ println(i) } val k = for(j <- arr125 if j % 2 == 0) yield j * 10 for(l <- k){ println(l) } for(i <- 1 to 3;j <- 1 to 3 if i != j) println((10 * i + j) + " ") val str = "abcdef" //局部变量<-循环的数组或集合 for(b <- str){ //println(b) } val arr = Array(1,2,3,4,5,6) for(e <- arr)println(e) for(f <- 0 to arr.length - 1){ //println(f) println(arr(f)) } for(f <- 0 until arr.length){ println(arr(f)) } for(f <- arr.indices) { println(arr(f)) } val g = for(e <- arr) yield e * 10 for(h <- g){ println(h) } for(i <- 1 to 3; j <- 1 to 3 if i != j) print((10 * i + j) + " ") val i = Array(1,2,3,4,5,6,7,8,9,10) val j = for(k <- i if k % 2 == 0) yield k * 10 for(l <- j) println(l) } }4.4.运算符重载成方法
package cn._51doit.day01 object MethodDemo { def main(args: Array[String]): Unit = { println(1 + 2) println(1.+(2)) println(1 to 10) println(1.to(10)) println(1 - 2) println(1.-(2)) }5.方法的定义
package cn._51doit.day01 object MethodDemo { def main(args: Array[String]): Unit = { println(1 + 2) println(1.+(2)) println(1 to 10) println(1.to(10)) println(1 - 2) println(1.-(2)) val j = m1(5,6) println(j) val k = m2(5,6) println(k) val l = ss(1,2) println(l) m3() m3 m4 mw() } def m5(x: Int,y: Int): Int = { println("++++++++++") x + y } def m6(x: Int,y:Int) = { x + y } def m7(): Unit = { println("helloworld") } def m8: Unit = { println("helloworld") } def m9{ println("heloworld") } def m10(a: Int,b: Int): Int ={ m9 a + b } def m1(x: Int,y:Int): Int = { println("++++++") x + y } def m2(x: Int,y:Int) = { x + y } def m3(): Unit ={ println("helloworld") } def m4: Unit ={ println("helloworld") } def mw(){ println("hahaha") } def ss(a: Int,b: Int): Int = { m4 a * b } } package cn._51doit.day01 object MethodDemo { def main(args: Array[String]): Unit = { println(1 + 2) println(1.+(2)) println(1 to 10) println(1.to(10)) println(1 - 2) println(1.-(2)) val j = m1(5,6) println(j) val k = m2(5,6) println(k) val l = ss(1,2) println(l) m3() m3 m4 mw() } def m5(x: Int,y: Int): Int = { println("++++++++++") x + y } def m6(x: Int,y:Int) = { x + y } def m7(): Unit = { println("helloworld") } def m8: Unit = { println("helloworld") } def m9{ println("heloworld") } def m10(a: Int,b: Int): Int ={ m9 a + b } def m1(x: Int,y:Int): Int = { println("++++++") x + y } def m2(x: Int,y:Int) = { x + y } def m3(): Unit ={ println("helloworld") } def m4: Unit ={ println("helloworld") } def mw(){ println("hahaha") } def ss(a: Int,b: Int): Int = { m4 a * b } }6.函数的定义及其完整地定义
package cn._51doit.day01 object FunctionDemo { def main(args: Array[String]): Unit = { val a = func(5) //println(a) val b = f2(1,2) //println(b) //f0() //val c = (5) //println(c) } val f4 = (a: Int,b: Int) => (a + b) val f5: (Int,Int) => (Int) = (a,b) => (a * b) val f6: Int => Int = (x: Int) => (x * x) val f7: Int => Int = (x) => (x * x) val f8: Int => Int = x => (x * x) val f9: (Int,Double) => Double = (a: Int,b: Double) => (a + b) (x: Int) => x * x val func1 = (x : Int) => x * x val func2 : Int => Int = (x : Int) => x * x val func3 : Int => Int = (x) => x * x val func4 : Int => Int = x => x * x val func = (x: Int) => { x * x } val f1 : (Int,Int) => Int = (c,d) => c + d val f2 = (x:Int,y:Int) => x + y val f3 : (Int,Int) => Int = (a,b) => a + b val f0 = () => println("helloworld") (x: Int) => x * x }7.函数作为参数传到方法中
package cn._51doit.day01 object MethodAndFunction { def main(args: Array[String]): Unit = { val abc = m(5) //println(abc) //定义一个函数 val f3 = (x: Int) => x * x val f1 = (x: Int) => x * x val f2 = (x: Int) => x + x val f4 = (x: Int) => x + x val f5 = (x: Int,y: Double) => x * y val b = mf2(f5) println(b) val c = m21(f2) println(c) m5(f5) //将函数作为参数传入到方法中 //mf(f1) //val r = mf1(6, f2) //调研匿名函数 val z = mf1(9,(x: Int) => x * 9) val r = mf1(8,(x: Int) => x * 10) //println(r) } def m(x: Int) : Int = x * x def m12(a: Int) : Int = a * a def m21(f: Int => Int): Int = { f(2) } def m3(v: Int,f: Int => Int): Int = { f(v) } def m4(f: Int => Int){ println(f(5)) } def m5(f: (Int,Double) => Double): Unit ={ println(f(5,6.0)) } def mf1(f:Int => Int){ println(f(5)) } //定义一个方法,方法的参数是一个函数 def mf(f: Int => Int){ println(f(5)) } //输入两个参数的函数,第一参数时一个Int的值,第二个参数时一个函数,就是一个计算逻辑 def mf2(v: Int, f: Int => Int): Int = { f(v) } def mf1(v: Int, f: Int => Int): Int = { f(v) } def mf2(f: (Int,Double) => Double): Unit = { println(f(5,6.0)) } //方法要传入一个输入两个参数的函数 def mf3(f: (Int,Double) => Double): Unit = { println(f(5, 6.0)) } }8.数组的定义
package cn._51doit.day01 object ArrayDemo { def main(args: Array[String]): Unit = { val arra = Array(1,2,3,4,5,6) val arrb = new Array[String](10) arrb(0) = "a" arrb(1) = "b" arrb(2) = "c" val sum1 = arra.sum val max1 = arra.max val min1 = arra.min val length = arra.length val sorted1 = arra.sorted println(sorted1.toBuffer) val reverse1 = arra.reverse println(reverse1.toBuffer) val f4 = (a: Int) => a % 2 == 0 val arrc = arra.filter(f4) val arrd = arra.filter((a: Int) => a % 2 == 0) val arre = arra.filter((a) => a > 0) val arrf = arra.filter(a => a > 0) val f5 = (a: Int) => (a * 100) arra.map(f5) arra.map(a => a * 2) arra.filter(a => a > 0).map(a => a * 2) arrb.map(a => a.toUpperCase) val arr1 = Array(1,2,3,4,5,6) val arr2 = new Array[Int](10) arr2(0) = 1 arr2(1) = 2 arr2(2) = 3 val sum = arr1.sum println(sum) val min = arr2.min println(min) val max = arr2.max println(max) val arr3 = Array(1,2,5,3,4,6,7,8,9,10) val sorted = arr3.sorted println(sorted) val reverse = arr3.reverse println(reverse.toBuffer) val arr4 = for(i <- arr3 if i > 2)yield i println(arr4.toBuffer) val f1 = (x: Int) => x % 2 == 0 val f2 = (y: Int) => y < 4 val arr5 = arr3.filter(f1) val arr6 = arr3.filter(f2) val arr7 = arr3.filter((x:Int) => x % 2 != 0) val arr8 = arr3.filter((x) => x % 2 != 0) val arr9 = arr3.filter(x => x % 2 != 0) val arr10 = arr3.map(x => x * x) val abcabc = arr3.reduce((a, b) => a + b) println(abcabc) val defdef = arra.reduce((a, b) => a * b) println(defdef) println(arr10.toBuffer) val f3 = (x: Int) => (x * 100) val arr11 = arr3.map(f3) val arr12 = arr3.map(x => x * 2) val arr13 = Array("abc","def","lmn") val arr14 = arr13.map(x => x.toUpperCase) val arr15 = arr3.filter(x => x % 2 == 0).map(x => x * 125) val abc = arr3.reduce((x, y) => x + y) println(abc) } } package cn._51doit.day01 object IfDemo { def main(args: Array[String]): Unit = { val a = 10 val b = if(a > 1) 25 else 10 val c: Int = if(a > 1){ 25 }else{ 10 } val d = if(a > 1) 25 else "helloworld" val e: Any = if(a > 1){ 25 }else if(a < 25){ 10 }else{ "HELLOWORLD" } println(e) val f = if(a > 1)125 else() val g = if(a > 125) 10//没有else返回Unit(无返回值类型)的一个实例即() val i = 10 val j: Int = if(i > 25) 25 else 10 val k = if(i > 0){ 10 25 }else{ 0 } //println(k) val l = if(i > 1) 1 val m: Any = if(i > 25) 1 else() val n = if(i > 0) 1 else "abc" val o = { if(i > 10){ 12 }else if(i > 5){ "125哦" }else{ true } } println(o) } }9.匿名函数简写方式
package cn._51doit.day02 object ArrayDemo { def main(args: Array[String]): Unit = { val arr = Array(1,2,3,4,5,6,7,8,9,10) val arr1: Array[Int] = arr.filter(a => a % 2 == 0) val arr2: Array[Int] = arr.filter(_ % 2 == 0) val arr3: Array[Int] = arr.map(b => b * 10) val arr4: Array[Int] = arr.map(_ * 10) val arr5: Array[Int] = arr.filter(a => a % 2 == 0).map(a => a * 10) val arr6: Array[Int] = arr.filter(_ % 2 == 0).map(_ * 10) val a: Int = arr.reduce((x, y) => (x + y)) val b: Int = arr.reduce(_ + _) } }10.将方法转成一个新的函数以及方法中传入函数的多种方式
package cn._51doit.day02 object MethodToFunction { def main(args: Array[String]): Unit = { val f = m _ val a = f(5) println(a) val ff = (a: Int) => { m(a) } val b = ff(6) println(b) println("helloworld") } def m(x: Int): Int = { println("this method invoked") x + x } } package cn._51doit.day02 object CallByName { def main(args: Array[String]): Unit = { //语法糖 val arr = Array(1,2,3,4,5,6,7,8,9,10) val r1: Array[Int] = arr.map(m) println(r1.toBuffer) //val f = m _ //val r2: Array[Int] = arr.map(f) val r2: Array[Int] = arr.map(m _) println(r2.toBuffer) val ff = (x: Int) => { m(x) } //val r3: Array[Int] = arr.map(ff) val r3: Array[Int] = arr.map(x => m(x)) println(r3.toBuffer) val r4: Array[Int] = arr.map(m(_)) println(r4.toBuffer) /* //将m方法的名称传入到map方法中 val r1: Array[Int] = arr.map(m) println(r1.toBuffer) //该使用方法和上面的效果效果一样 //val f = m _ //val r2: Array[Int] = arr.map(f) val r2: Array[Int] = arr.map(m _) println(r2.toBuffer) val f3 = (x: Int) => { //将x作为参数传入到方法中 m(x) } val r3: Array[Int] = arr.map(f3) //val r3: Array[Int] = arr.map(x => m(x)) println(r3.toBuffer) val r4: Array[Int] = arr.map(m(_)) println(r4.toBuffer)*/ } def m(x: Int): Int = x * x }11.reduce方法的使用
package cn._51doit.day02 object ReduceLeftDemo { def main(args: Array[String]): Unit = { val arr = Array(1,2,3,4,5,6,7,8,9,10) //1, 2 => 1 + 2 =3 //3, 3 => 3 + 3 =6 //6, 4 => 6 + 4 =10 //10, 5 => 10 + 5 = 15 val r1 = arr.reduceLeft(_ + _) val r2 = arr.reduce(_ + _) val r3 = arr.sum //reduce方法中传入相加函数可以使用sum来代替 //val r2 = arr.sum val r4 = arr.reduce(_ * _) val r5 = arr.product //reduce方法中传入相加函数可以使用sum来代替 //val r3 = arr.product val r = arr.reduce(_ - _) //-13 println(r1) println(r2) println(r3) println(r4) println(r5) println(r) } } package cn._51doit.day02 object ReduceRightDemo { def main(args: Array[String]): Unit = { val arr = Array(1,2,3,4,5) val r1 = arr.reduceRight(_ + _) println(r1) val r2= arr.reduceRight(_ - _) //从左开始进行运算 1 -(2 - (3 - (4 - 5))) println(r2) } }12.fold方法的使用
package cn._51doit.day02 object FoldDemo { def main(args: Array[String]): Unit = { val arr = Array(1,2,3,4,5) //100, 1 => 100 + 1 = 101 //101, 2 => 101 + 2 = 103 val r1 = arr.fold(100)((x, y) => x + y) val i2 = arr.reduce(_ + _) println(i2) val r2 = arr.fold(0)((x, y) => x + y) val i3 = arr.reduce(_ * _) val r3 = arr.fold(1)((x, y) => x + y) //println(r1) } } package cn._51doit.day02 object FoldLeftDemo { def main(args: Array[String]): Unit = { val arr = Array(1,2,3,4,5) //100, 1 => 100 + 1 = 101 //101, 2 => 101 + 2 = 103 val r1 = arr.foldLeft(100)((x, y) => x + y) val r2 = arr.foldLeft(100)(_ - _) println(r1) //100-1-2-3-4-5 println(r2) } } package cn._51doit.day02 object FoldRightDemo { def main(args: Array[String]): Unit = { val arr = Array(1,2,3,4,5,6) val abc: Int = arr.foldRight(100)(_ + _) println(abc) val lmn: Int = arr.foldRight(100)(_ - _) println(lmn) //5-100=-95 //4-(-95)=99 //3-99=-96 //2-(-96)=98 //1-98=97 } }13.排序
package cn._51doit.day02 object SortDemo { def main(args: Array[String]): Unit = { val arr = Array(5,2,7,3,9,4,6,1,8) val a1 = arr.sorted println("原来的:" + arr.toBuffer) println("排序的:" + a1.toBuffer) val a2 = arr.sortWith((x, y) => x > y) println("排序的2:" + a2.toBuffer) val arr_new = Array(11, 30, 10, 2, 88, 33, 3, 25, 100) val a3 = arr_new.sortBy(x => x) println("排序的3:" + a3.toBuffer) //按字符串规则进行排序,不改变原来数组里面的数据类型 val a4 = arr_new.sortBy(x => x.toString) val a5 = arr.sortBy(x => x + "") println("排序的4:" + a4.toBuffer) val a6 = arr.sortBy(-_).reverse val xy = arr.sortBy(-_) val z = arr.sortBy(x => -x) } }14.元组
package cn._51doit.day02 object TupleDemo { def main(args: Array[String]): Unit = { val t0 = ("abc", 1, 2.0) val t1: (String, Int, Double) = ("abc", 1, 2.0) val t2: Tuple3[String, Int, Double] = ("abc", 1, 2.0) val v = t2._3 val t3 = ("abc",123) val tt = t3.swap//交换位置生成一个新的元组 //把Double,Int当成一个整体放到元组里返回 val f: (Int,Double) => (Double,Int) = (a:Int,b:Double) => (b,a) println(v) } }15.ArrayBuffer的使用
package cn._51doit.day02 import scala.collection.mutable import scala.collection.mutable.ArrayBuffer object ArrayBufferDemo { def main(args: Array[String]): Unit = { val ab = new ArrayBuffer[Int]() ab.append(1) ab.append(2) println(ab) ab += 3 ab += 4 ab += 5 println(ab) //移除数据(冲ab指定的位置移除一个元素) //ab.remove(0) ab.remove(1, 2) ab -= 4 println(ab) //在指定的位插入数据 ab.insert(1, 2,3,4) println(ab) //最重要的就是添加:append,+= //移除元素:remove,-= //++= 将一个数组、集合追加到ArrayBuffer ab ++= Array(6,7,8) println(ab) ab ++= ab println(ab) for(e <- ab) { println(e) } val i = ab(0) println(i) ab(0) = 1000 println(ab) //将Array转成一个新的ArrayBuffer,原来的没有发生变化 val arr = Array(1,2,3,4,5) val buffer: mutable.Buffer[Int] = arr.toBuffer val array: Array[Int] = ab.toArray } }16.List的使用和ArrayList的使用
package cn._51doit.day02 //数组:一旦定义长度就不可以变,但是数组中的内容可以改变 //元组:一旦定义的内容不可以改变 //scala中的结合分为两类:可变集合、不可变集合 //可变集合:长度和结合中的内容都可以改变 //不可变集合:长度和结合中的内容都不可以改变 object ListDemo { def main(args: Array[String]): Unit = { val list1 = List(1,2,3,4,5,6,7,8,9,10) //不可变的结合 val lst1 = List(1,2,3,4,5) val i = lst1(0) val lst2 = List(5,6,7) //:+ 将lst1中添加一个新的元素,生成一个新的list val lst3 = lst1 :+ 1 //+: 将元素加到List的前面 val lst0 = 0 +: lst1 //++ 将两个List添加到一起,得到一个新的List val lst4 = lst1 ++ lst2 //取出List中的Head val h: Int = lst1.head //取出来tail val tail: List[Int] = lst1.tail //取出具体下标的数据 val i1 = lst1(4) println(i) val list2 = list1.take(1)//取1个 val list3 = list1.take(2)//取2个 val list4 = list1.take(3)//取3个 val list5 = list1.take(4)//取4个 val list6 = list1.take(5)//取5个 val list7 = list1.take(6)//取5个 } } package cn._51doit.day02 import scala.collection.mutable.ListBuffer object ListBufferDemo { def main(args: Array[String]): Unit = { val lb = new ListBuffer[Int]() lb += 1 lb += 2 val lst = List(4,5,6) lb ++= lst println(lb) lb -= 4 //将ListBuffer转成一个新的List val nList = lb.toList //将List转成一个新的ListBuffer val buffer = lst.toBuffer } }18.Set的使用
package cn._51doit.day02 import scala.collection.mutable object SetDemo { def main(args: Array[String]): Unit = { //默认使用的是不可变的Set,无序去重 val s1 = Set(100, 2, 8, 19, 50, 77, 8, 2) //判断是否在set中 val bool = s1.contains(50) val s2: Set[Int] = s1.map(_ * 10) val s3 = s1.take(3) //println(s1) //println(s2) //println(s3) //可变的Set val ms1 = new mutable.HashSet[Int]() ms1 += 1 ms1 += 100 ms1 += 5 ms1 += (6, 7) val s0 = Set(110,119) ms1 ++= s0 println(ms1) ms1 -= 6 //ms1.contains(5) val bool1 = ms1(5) println(bool1) } }19.Map的使用
package cn._51doit.day02 import scala.collection.mutable object MapDemo { def main(args: Array[String]): Unit = { //不可变map val m0 = Map(("aaa", 111), ("bbb", 222), ("ccc",333)) val m1 = Map("a" -> 1, "b" -> 2, "spark" -> 3.0, "flink" -> 1.11) val v = m1("spark") //可变map val m2 = new mutable.HashMap[String, Int]() m2("a") = 10 m2 += ("b" -> 20) m2 += (("c", 30)) m2.put("spark", 2) println(m2) m2 -= "b" println(m2) //遍历map for(kv <- m2) { val key = kv._1 val value = kv._2 println("key: " + key + " , value: " + value) } //将Map toList val list: List[(String, Int)] = m2.toList //将map中的keys val keys: Iterable[String] = m2.keys for(k <- keys) { println(k) } val values: Iterable[Int] = m2.values for(v <- values){ println(v) } } }20.Wordcount的编写
package cn._51doit.day02 object WordCountDemo3 { def main(args: Array[String]): Unit = { val wordAndCount = Array(("spark", 3), ("hadoop", 2), ("spark", 5), ("hadoop", 1), ("flink", 2)) val group = wordAndCount.groupBy(_._1) val counts1 = group.mapValues(_.map(_._2).sum) val counts2 = group.mapValues(g => g.map(g => g._2).sum) val counts3 = group.mapValues(arr => arr.map(_._2).sum) val counts4 = group.mapValues(g => g.reduce((x, y) => (x._1, x._2 + y._2))) val counts5 = group.mapValues(_.reduce((x, y) => (x._1, x._2 + y._2))) val counts6 = group.mapValues(g => g.foldLeft(0)((x, y) => x + y._2)) val counts7 = group.mapValues(_.foldLeft(0)(_ + _._2)) println(counts7.toBuffer) } } package cn._51doit.day02 object WordCountDemo2 { def main(args: Array[String]): Unit = { val lines = Array("spark hadoop spark flink", "hadoop flink flink spark flink", "hbase flink") val words: Array[String] = lines.flatMap(l => l.split(" ")) val group: Map[String, Array[String]] = words.groupBy(w => w) val counts1: Map[String, Int] = group.map(g => (g._1, g._2.length)) val counts2: Map[String, Int] = group.mapValues(g => g.length) //group.map(_._1,_._2.length) /* val words = lines.flatMap(l => l.split(" ")) val wordAndOne = words.map(w => (w, 1)) val group = wordAndOne.groupBy(_._1) val counts1: Map[String, Int] = group.mapValues(arr => arr.map(_._2).sum) val counts2 = group.map(g => (g._1, g._2.length)) val counts3: Map[String, Int] = group.mapValues(arr => arr.map(_._2).sum)*/ //val list1 = counts.toList //val list2 = list1.sortBy(l => l._2) /* val words: Array[String] = lines.flatMap(l => l.split(" ")) val wordAndOne: Array[(String, Int)] = words.map(w => (w, 1)) val group: Map[String, Array[(String, Int)]] = wordAndOne.groupBy(_._1) val counts: Map[String, Int] = group.map(g => (g._1, g._2.length)) val list1: List[(String, Int)] = counts.toList val list2: List[(String, Int)] = list1.sortBy(l => l._2)*/ //val list2: List[(String, Int)] = list1.sortBy(l => l._2).reverse //val list2: List[(String, Int)] = list1.sortBy(l => -l._2) //println(list2) /* val words: 1Array[String] = lines.flatMap(l => l.split(" ")) val wordAndOne: Array[(String, Int)] = words.map(w => (w, 1)) val group: Map[String, Array[(String, Int)]] = wordAndOne.groupBy(w => w._1) val result: Map[String, Int] = group.map(g => (g._1, g._2.length)) val list1: List[(String, Int)] = result.toList val list2: List[(String, Int)] = list1.sortBy(l => l._2) println(list2) for (abc <- list2) { println(abc) }*/ /* //将单词的次数统计出来,并且按照次数的从高到低进行排 //val arr: Array[Array[String]] = lines.map(l => l.split(" ")) //val words: Array[String] = arr.flatten //切分压平 val words: Array[String] = lines.flatMap(l => l.split(" ")) //将单词和1组合 val wordAndOne: Array[(String, Int)] = words.map(w => (w, 1)) //分组 val group: Map[String, Array[(String, Int)]] = wordAndOne.groupBy(w => w._1) //聚合 val result: Map[String, Int] = group.map(g => (g._1, g._2.length)) //toList val list1: List[(String, Int)] = result.toList //排序 val list2: List[(String, Int)] = list1.sortBy(l => l._2).reverse //val list2: List[(String, Int)] = list1.sortBy(l => -l._2) println(list2) for(abc <- list2){ println(abc) }*/ } } package cn._51doit.day02 object WordCountDemo1 { def main(args: Array[String]): Unit = { val lines = Array("spark hadoop spark flink", "hadoop flink flink spark flink", "hbase flink") val words: Array[String] = lines.flatMap(l => l.split(" ")) val wordAndOne: Array[(String, Int)] = words.map(w => (w, 1)) val group: Map[String, Array[(String, Int)]] = wordAndOne.groupBy(w => w._1) val result: Map[String, Int] = group.map(g => (g._1, g._2.length)) val list1: List[(String, Int)] = result.toList val list2: List[(String, Int)] = list1.sortBy(l => l._2) println(list2) for (abc <- list2) { println(abc) } /* //将单词的次数统计出来,并且按照次数的从高到低进行排 //val arr: Array[Array[String]] = lines.map(l => l.split(" ")) //val words: Array[String] = arr.flatten //切分压平 val words: Array[String] = lines.flatMap(l => l.split(" ")) //将单词和1组合 val wordAndOne: Array[(String, Int)] = words.map(w => (w, 1)) //分组 val group: Map[String, Array[(String, Int)]] = wordAndOne.groupBy(w => w._1) //聚合 val result: Map[String, Int] = group.map(g => (g._1, g._2.length)) //toList val list1: List[(String, Int)] = result.toList //排序 val list2: List[(String, Int)] = list1.sortBy(l => l._2).reverse //val list2: List[(String, Int)] = list1.sortBy(l => -l._2) println(list2) for(abc <- list2){ println(abc) }*/ } }21.并行化集合
package cn._51doit.day02 object AgreegateDemo { def main(args: Array[String]): Unit = { val arr = Array(1,2,3,4,5,6,7,8,9,10) //普通集合的aggregate的第二个函数没有使用,但是也要传入 val r1 = arr.aggregate(100)(_ + _, _ - _) println(r1) } } package cn._51doit.day03 object AgreegateDemo { //并行化集合,就是讲结合根据线程的个数划分成多个部分,每个线程计算一部分,先进行局部运算,在进行全局聚合 object ParAggregate { def main(args: Array[String]): Unit = { val lst = List(1,2,3,4,5,6,7,8,9,10) val r = lst.par.aggregate(0)(_ + _, _ + _) println() } } }22.定义类和成员
package cn._51doit.day03 class Boy { //相当于java中用final修饰的变量 //val id = "123456" val id = "1000001" //var name: String = _ var name: String = _ //var age: Int = _ var age: Int = _ } object Test { def main(args: Array[String]): Unit = { //val b = new Boy val b = new Boy //不论val修饰的变量还是var修饰的,都有类似java的getter方法 //println(b.name) //println(b.age) //println(b.id) println(b.id) println(b.name) println(b.age) //不能给val类型的变量赋值 //b.id = "xyzxyz" //b.id = "1888888" //可以给var类型的变量赋值 b.name = "abc" b.age = 125 //b.name = "老赵" //b.age = 18 //println(b.name) //println(b.age) println(b.name) println(b.age) } }23.构造器及其补充
package cn._51doit.day03 //写在构造器中,用val 或 var修饰的参数,就会成为这个类的成员变量 class Dog (val name: String, var age: Int) { } object Dog { def main(args: Array[String]): Unit = { val d = new Dog("tom", 6) println(d.name) println(d.age) //d.name = "jerry" d.age = 10 println(d.age) } } package cn._51doit.day03 class Cat(val name: String,var age: Int) { var gender: String = _ def this(name: String,age: Int,gender: String){ this(name,age) this.gender = gender } def this(){ this(null,0) } /* var gender: String = _ //构造器也叫构造方法:定义辅助构造器(辅助构造器中参数不能用var/val修饰) def this(name: String, age: Int, gender: String) { //第一行一定要调用主构造器 this(name, age) this.gender = gender } //辅助构造器 def this() { this(null, 0) }*/ } object Cat { def main(args: Array[String]): Unit = { //val c = new Cat("tomcat", 10, "male") //println(c.gender) val c = new Cat println(c.name) } }24.单例对象和伴生对象
package cn._51doit.day03 class Girl { //val id = "123456" val id = "20000" //private var age: Int = _ private var age: Int = _ //private var weight: Double = _ private var weight: Double = _ Girl.sayHi() } //普通的静态对象 object Test2 { def main(args: Array[String]): Unit = { val g = new Girl //println(g.age) //val g = new Girl //println(g.age) Girl.sayHello() //不能访问,因为sayHi方法用private修饰了 //Girl.sayHi() } } //伴生对象:在同一个文件中,并且跟class名字相同的object就叫作伴生对象 //伴生对象和class可以相互访问对方私有的属性和方法 object Girl { def main(args: Array[String]): Unit = { val g = new Girl g.weight = 125.000 val weight = g.weight //g.weight = 89.0 //val weight = g.weight println(weight) println(weight) } private def sayHi(): Unit ={ println("hi~~~") } def sayHello(): Unit ={ println("hello~~~") } /* private def sayHi(): Unit = { println("hi~~~") } def sayHello(): Unit = { println("hello~~~") }*/ } package cn._51doit.day03 class Man { //private后面加上了[this],即使在伴生对象中也访问不了 private[this] var money: Double = _ //private[this] var money: Double = _ def setMoney(m: Double): Unit ={ money = m } def getMoney(): Double = { money } /* def setMoney(m: Double): Unit = { money = m } def getMoney(): Double = { //Man.sayNo money }*/ } object Man { def main(args: Array[String]): Unit = { val m = new Man //m.money //m.money m.setMoney(1000.0) println(m.getMoney()) sayCan() println(sayNo()) } private[this] def sayCan(): Unit ={ println("I can doit") } private[this] def sayNo(): Unit = { println("NO~~~~") } } package cn._51doit.day03 //包访问控制权限 private[day03] class Woman { } package cn._51doit.day03 //私有的构造方法,只能在伴生对象中new class Pig private(val name: String, private var age: Int, nickname: String, //没有用val,也没有用var修饰,不会成为类的成员变量,子类无法继承 private[this] var gender: String = "male" ) { //主构造器的构造代码块 println(nickname) println(gender) } object Pig { def main(args: Array[String]): Unit = { val p = new Pig("猪八戒", 30, "天蓬元帅") println(p.name) println(p.age) //println(p.nickname) //println(p.gender) } }25.继承类和实现特质
package cn._51doit.day03 abstract class Animal { //def speak(): Unit def speak(): Unit def breath(): Unit ={ println("呼吸氧气!") } } package cn._51doit.day03 trait Runnable { def run(): Unit = { println("走") } } package cn._51doit.day03 trait Flyable { def fly(): Unit = { println("飞飞飞飞") } } package cn._51doit.day03 trait Fightable { def fight(): Unit = { println("打你") } } package cn._51doit.day03 //scala的类如果没有基础类,第一个实现的特质也要用extends关键字 class Pig2 extends Flyable with Runnable { override def fly(): Unit = { println("飞猪!") } } object Pig2 { def main(args: Array[String]): Unit = { //动态实现特质 val p = new Pig2 with Fightable { override def fight(): Unit = { println("用耙子打你") } } // p.fly() // p.run() // p.fight() val f: Flyable = new Pig2 f.fly() } } package cn._51doit.day03 class Monkey extends Animal with Runnable with Flyable { //实现抽象的方法,可以加override关键字,也可以不加 def speak(): Unit = { println("啊啊哦哦") } //重写非抽象的方法,必须加override关键字 override def breath(): Unit = { println("呼吸82年氧气") } override def run(): Unit = { println("蹦蹦跳跳的走") } override def fly(): Unit = { println("乘着筋斗云飞") } } object Monkey { def main(args: Array[String]): Unit = { val m = new Monkey m.breath() m.speak() m.run() m.fly() } }26.多态
val f: Flyable = new Pig2 f.fly()27.apply方法
package cn._51doit.day03 object ApplyDemo { def main(args: Array[String]): Unit = { //数组Apply后面直接跟括号,其实是一种简写方式,相当于调用了apply方法 val abc1 = Array(1,2,3,4,5,6) val abc2 = Array.apply(1,2,3,4,5,6) val abc3 = new Array[Int](5) //val a1 = Array(1,2,3,4,5) //val a0 = Array.apply(1,2,3,4,5) //val a2 = new Array[Int](5) val lmn1 = apply(5) println(lmn1) val lmn2 = apply(2,5) println(lmn2) val lmn3 = ApplyDemo.apply(6) println(lmn3) val lmn4 = ApplyDemo.apply(2,5) println(lmn4) val lmn5 = ApplyDemo(6) println(lmn5) val lmn6 = ApplyDemo(2,5) println(lmn6) val r1 = apply(5) println(r1) val r2 = ApplyDemo.apply(6) println(r2) val r3 = ApplyDemo(7) println(r3) val r4 = ApplyDemo(5, 6) println(r4) } def apply(x: Int):Int = { x * x } /* def apply(x: Int):Int = { x * x }*/ def apply(a: Int,b:Int): Int = { a + b } /* def apply(a: Int, b: Int): Int = { a + b } */ }28.模式匹配
package cn._51doit.day03.cs import scala.util.Random object CaseDemo01 extends App{ val arr = Array("YoshizawaAkiho", "YuiHatano", "AoiSola") var name = arr(Random.nextInt(arr.length)) //val name = arr(Random.nextInt(arr.length)) name match{ case "" => { println("") } case "" => { println("") } case _ => { println("") } } name match { case "YoshizawaAkiho" => { println("吉泽老师...") } case "YuiHatano" => println("波多老师...") case _ => println("真不知道你们在说什么...") } } package cn._51doit.day03.cs import scala.util.Random object CaseDemo02 extends App{ //val v = if (x >= 5) 1 else if(x > 2) 2.0 else "hello" //val v = if(x >= 5) 1 else if(x < 2) 2.0 else "hello" val arr: Array[Any] = Array("hello",1,2.0,CaseDemo02) val v = arr(Random.nextInt(arr.length)) //val v: Any = 2.0 println(v) v match{ case x: Int => println("Int" + x) case y: Double if(y >= 0)=> println("Double" + y) case z: String => println("String"+ z) case _ => throw new Exception("not match exception") } /* val arr: Array[Any] = Array("hello", 1, 2.0, CaseDemo02) val v = arr(Random.nextInt(4)) //val v: Any = 2.0 println(v) v match { case x: Int => println("Int " + x) case y: Double if(y >= 3) => println("Double "+ y) case z: String => println("String " + z) case _ => throw new Exception("not match exception") }*/ } package cn._51doit.day03.cs object CaseDemo03 extends App{ val arr = Array(1,2,3,4,5,6) arr match{ case Array(1,a,b) => println(a + " " + b) case Array(0) => println("only 0") case Array(0,_*) => println("0......") case _ => println("something else") } val list = List(0,-9,9) list match{ case 0::Nil => println("only 0") case x::y::Nil => println(s"x: $x y: $y") case 0::tail => println("0......") case _ => println("something else") } val tumple = (1,2,3) tumple match{ case(2,a,b) => println(s"2,$a,$b") case(_,z,5) => println(z) case _ => println("else") } // val arr = Array(1, 3, 5, 6) // arr match { // case Array(1, x, y) => println(x + " " + y) // case Array(0) => println("only 0") // case Array(0, _*) => println("0 ...") // case _ => println("something else") // } // val lst = List(0, -1, 3) // lst match { // case 0 :: Nil => println("only 0") // case x :: y :: Nil => println(s"x: $x y: $y") // case 0 :: tail => println("0 ...") // case _ => println("something else") // } /* val tup = (5, 3, 4) tup match { case (2, x, y) => println(s"2, $x , $y") case (_, z, 5) => println(z) case _ => println("else") }*/ }29.样例class和样例object
package cn._51doit.day03.cs import scala.util.Random //样例类、样例对象:专门是用来模式匹配的,样例类可以创建多个实例,样例object是单例的 //样例类创建实例后可以封装数据,样例object不能保存数据 //样例类不需要new //样例类 case class SubmitTask(id: String,name: String) //case class SubmitTask(id: String, name: String) case class HeartBeat(time: Long) //case class HeartBeat(time: Long) //样例对象 case object CheckTimeOutTask //case object CheckTimeOutTask object CaseDemo4 extends App{ //object CaseDemo4 extends App { val arr = Array(CheckTimeOutTask,new HeartBeat(123456),HeartBeat(678910), SubmitTask("123456","ABCABC")) //val arr = Array(CheckTimeOutTask, new HeartBeat(12333) , HeartBeat(5555), SubmitTask("0001", "task-0001")) val element = arr(Random.nextInt(arr.length)) val r = arr(Random.nextInt(arr.length)) println(element) println(r) element match{ case SubmitTask(id,name) => { println(s"$id,$name") } case HeartBeat(time) => { println(time) } case CheckTimeOutTask => { println("check") } } r match { case SubmitTask(id, name) => { println(s"$id, $name") } case HeartBeat(time) => { println(time) } case CheckTimeOutTask => { println("check") } } }30.option,some,none
package cn._51doit.day03.cs object OptionDemo { def main(args: Array[String]): Unit = { val map = Map("a" -> 1,"b" -> 2,"c" -> 3) var abc: Int = map("b") println(abc) val opt:Option[Int] = map.get("b") //val opt: Option[Int] = map.get("b") var lmn = opt match{ case Some(x) => x case None => 0 } println(lmn) var hahaha = map.getOrElse("b",-1) println(hahaha) //val mp = Map("a" -> 1, "b" -> 2) //val r: Int = mp("c") //println(r) //val op: Option[Int] = mp.get("b") //val r = op match { //case Some(x) => x //case None => 0 // } //val r = mp.getOrElse("b", -1) //println(r) } }31.偏函数
package cn._51doit.day03.cs object PartialFuncDemo { def f1: PartialFunction[String,Int] = { case "one" => 1 case "two" => 2 case _ => -1 } def f2(num: String): Int = num match{ case "one" => 1 case "two" => 2 case _ => -1 } //PartialFunction类型的方法就叫作偏函数 def func1: PartialFunction[String, Int] = { case "one" => 1 case "two" => 2 case _ => -1 } def func2(num: String) : Int = num match { case "one" => 1 case "two" => 2 case _ => -1 } def main(args: Array[String]) { println(f1("two")) println(f2("one")) println(func1("two")) println(func2("two")) } }32.深入理解函数
package cn._51doit.day04 /** * 深入的理解什么是函数 * * 函数的本质就是一个引用类型,有多重表现形式,即创建函数时可以用下面的多中方式。 * * 本质上就是new FunctionN的实现类(实例),就是在堆内存中开辟的一块空间。函数名(变量名)指向堆内存中的空间。 * */ object FunctionInDeep125 { def main(args: Array[String]): Unit = { val r = f4(5, 6.6) println(r) val arr = Array(1,2,3,4,5) arr.map(_ * 10) } val f1 = (x: Int, y: Double) => (y, x) val f2: (Int, Double) => (Double, Int) = (x: Int, y: Double) => (y, x) val i = 10 val j: Int = 10 val f3: (Int, Double) => (Double, Int) = (x, y) => (y, x) //函数本质是引用类型 val f4 = new Function2[Int, Double, (Double, Int)] { override def apply(v1: Int, v2: Double): (Double, Int) = { (v2, v1) } } val f5 = new ((Int, Double) => (Double, Int)) { override def apply(v1: Int, v2: Double): (Double, Int) = { (v2, v1) } } val f6: Function2[Int, Double, (Double, Int)] = new Function2[Int, Double, (Double, Int)] { override def apply(v1: Int, v2: Double): (Double, Int) = { (v2, v1) } } val f7: (Int, Double) => (Double, Int) = new ((Int, Double) => (Double, Int)) { override def apply(v1: Int, v2: Double): (Double, Int) = { (v2, v1) } } val f8: Function2[Int, Double, (Double, Int)] = new ((Int, Double) => (Double, Int)) { override def apply(v1: Int, v2: Double): (Double, Int) = { (v2, v1) } } val f9: Function2[Int, Double, Tuple2[Double, Int]] = new ((Int, Double) => (Double, Int)) { override def apply(v1: Int, v2: Double): (Double, Int) = { (v2, v1) } } val f10: Function1[Int, String] = new Function1[Int, String]() { override def apply(t: Int): String = { t.toString } } }33.自定义map,filter,reduce方法
package cn._51doit.day04; public interface MapFunction { //定义一个抽象的方法(标准、规范) String apply(String word); } package cn._51doit.day04; import java.util.ArrayList; import java.util.List; //扩展一些方法,并且可以实现链式编程 //MyAdvList类中有List的方法:继承 public class MyAdvList<T> extends ArrayList<T> { public <R> List<R> map(Function1<T,R> fun){ ArrayList<R> newList = new ArrayList<R>(); for(T t: this){ R nt = fun.apply(t); newList.add(nt); } return newList; } public List<T> filter(Function1<T,Boolean> fun){ ArrayList<T> newList = new ArrayList<T>(); for(T t: this){ Boolean flag = fun.apply(t); if(flag){ newList.add(t); } } return newList; } /* public <R> List<R> map(Function1<T, R> func) { //定义一个新的List装新的数据 ArrayList<R> nList = new ArrayList<>(); //循环老的List就是当前对象 for(T t: this) { //应用外部传入的逻辑 R t1 = func.apply(t); //将新的数据装入到新的List nList.add(t1); } //返回新的List return nList; }*/ //自定义一个filter、reduce } package cn._51doit.day04; import java.util.ArrayList; import java.util.Arrays; import java.util.List; /** * 使用Java语言,编写实现类似Scala的Map方法 */ public class JavaMapDemo { public static void main(String[] args) { List<String> words = Arrays.asList("Spark", "hadoop", "Flink", "hive", "hbase"); //不是每一次执行一个逻辑都自己写for循环,不在for循环中每次都实现自己的逻辑 //而是封装成一个map方法,传入具体的逻辑即可 // List<String> nList = new ArrayList<>(); // for (String word : words) { // String s = word.toUpperCase(); // //添加到新的List // nList.add(s); // } // for (String word : words) { // String s = word.toLowerCase(); // //添加到新的List // nList.add(s); // } // for (String s : nList) { // System.out.println(s); // } MyList myList = new MyList(words); MapFunction fun1 = new MapFunction() { @Override public String apply(String word) { return "doit" + word.toUpperCase(); } }; MapFunction fun2 = new MapFunction() { @Override public String apply(String word) { return word.toLowerCase(); } }; List<String> newList1 = myList.map(fun1); for (String s : newList1) { System.out.println(s); } List<String> newList2 = myList.map(fun2); for (String s : newList2) { System.out.println(s); } List<String> map1 = myList.map(String::toUpperCase); List<String> map2 = myList.map(b -> b.toLowerCase()); map1.forEach(System.out::println); map2.forEach(System.out::println); words.stream().map(w -> w.toUpperCase() + "abcabc").forEach(w -> System.out.println(w)); words.stream().map(w -> w.toLowerCase() + "abcabc").forEach(w -> System.out.println(w)); words.stream().map(w -> w.startsWith("h")).forEach(w -> System.out.println(w)); /* MyList myList = new MyList(words); MapFunction fucn1 = new MapFunction() { @Override public String apply(String word) { return "DoIt_" + word.toUpperCase(); } }; MapFunction fucn2 = new MapFunction() { @Override public String apply(String word) { return word.toLowerCase(); } }; List<String> nList = myList.map(fucn1); // for (String r : nList) { // System.out.println(r); // } //使用Lambda表达式 //List<String> nLst = myList.map(String::toUpperCase); List<String> nLst = myList.map(x -> x.toUpperCase()); // x => x.toUpperCase 变成 _.toUpperCase // nLst.forEach(System.out::println); //如果使用了Java8的Lambda表达式,你自己定义MyList就low了 words.stream().map(w -> w.toUpperCase() + "2.0").forEach(w -> System.out.println(w)); words.stream().filter(w -> w.startsWith("h")).forEach(w -> System.out.println(w));*/ } } package cn._51doit.day04; public interface Function1<T, R> { public R apply(T t); } package cn._51doit.day04; import java.util.List; public class MyAdvListDemo { public static void main(String[] args) { MyAdvList<String> myList = new MyAdvList<String>(); myList.add("spark"); myList.add("filnk"); myList.add("hive"); List<String> newList = myList.map(new Function1<String, String>() { @Override public String apply(String s) { return s.toUpperCase() + "125"; } }); for (String s : newList) { System.out.println(s); } MyAdvList<Integer> numbers = new MyAdvList<Integer>(); numbers.add(1); numbers.add(2); numbers.add(3); numbers.add(4); numbers.add(5); List<Integer> mapNewNumbers = numbers.map(new Function1<Integer, Integer>() { @Override public Integer apply(Integer integer) { return integer * 10; } }); for (Integer mapNewNumber : mapNewNumbers) { System.out.println(mapNewNumber); } List<String> mapNewNumber = numbers.map(new Function1<Integer, String>() { @Override public String apply(Integer integer) { return integer * 10 + "hello"; } }); for (String s : mapNewNumber) { System.out.println(s); } List<Integer> newListabc = numbers.filter(new Function1<Integer, Boolean>() { @Override public Boolean apply(Integer integer) { return integer % 2 == 0; } }); for (Integer abc : newListabc) { System.out.println(abc); } // MyAdvList<String> myList = new MyAdvList<>(); // // myList.add("Spark"); // myList.add("Flink"); // myList.add("Hive"); // List<String> nList = myList.map(new Function1<String, String>() { // @Override // public String apply(String s) { // return s.toUpperCase() + "2.0"; // } // }); // // for (String s : nList) { // System.out.println(s); // } /* MyAdvList<Integer> numbers = new MyAdvList<>(); numbers.add(1); numbers.add(2); numbers.add(3);*/ // List<Integer> nNumbers = numbers.map(new Function1<Integer, Integer>() { // @Override // public Integer apply(Integer integer) { // return integer * 10; // } // }); // // for (Integer nNumber : nNumbers) { // System.out.println(nNumber); // } /* List<String> nList = numbers.map(new Function1<Integer, String>() { @Override public String apply(Integer integer) { return integer * 10 + " hello"; } }); for (String s : nList) { System.out.println(s); }*/ } } package cn._51doit.day04; import java.util.ArrayList; import java.util.List; public class MyList { private List<String> words; public MyList(List<String> words){ this.words = words; } public List<String> map(MapFunction fun){ ArrayList<String> newList = new ArrayList<String>(); for (String word : words) { String newWord = fun.apply(word); newList.add(newWord); } return newList; } /* private List<String> words; public MyList(List<String> words) { this.words = words; } public List<String> map(MapFunction func) { //定义一个新的List,用于存放新的数据 ArrayList<String> nList = new ArrayList<>(); //遍历老的List for (String word : words) { //应用外部传入的函数 String nWord = func.apply(word); //将新的单词添加到新的List中 nList.add(nWord); } //返回新的List return nList; }*/ }34.akkarpc 1.Actor就是通过发生消息实现并发 2.一个人就相当于一个Actor 3.Actor和Actor直接是可以发送消息的,自己也可以给自己发生消息 4.消息是有类型的,里面封装的数据(case class) 5.Actor是有生命周期的,是由ActorSystem创建的,ActorSystem负责管理和监控Actors,ActorSystem是单例的(object),Actor是多例的 6.消息的接收者可以将消息返回给消息的发送者 akkarpc远程通信机制 1.先启动master; 2.检测超时的worker; 3.启动worker; 4.master与worker建立连接; 5.worker向master发送注册信息; 6.mater接收到worker的注册信息后,向worker返回注册成功的消息; 7.worker接收到master返回注册成功的消息后,定期向master发送心跳
package com._51doit.akka.rpc import akka.actor.{Actor, ActorSystem, Props, SupervisorStrategy} import com.typesafe.config.ConfigFactory import scala.collection.mutable import scala.concurrent.duration._ class Master extends Actor{ val idToWorker = new mutable.HashMap[String, WorkerInfo]() val CHECK_INTERVAL = 15000 override def preStart(): Unit = { import context.dispatcher context.system.scheduler.schedule(0 milliseconds,CHECK_INTERVAL milliseconds,self,CheckTimeOutWorker) } override def receive: Receive = { case RegisterWorker(workerId,memory,cores) => { val workerInfo = new WorkerInfo(workerId,memory,cores) idToWorker(workerId) = workerInfo sender() ! RegisteredWorker } case Heartbeat(workerId) => { if(idToWorker.contains(workerId)){ val currentTime = System.currentTimeMillis() val workerInfo = idToWorker(workerId) workerInfo.lastHeartbeatTime = currentTime } } case CheckTimeOutWorker => { val currentTime = System.currentTimeMillis() val deadWorkers = idToWorker.values.filter(w => currentTime - w.lastHeartbeatTime > 15000) deadWorkers.foreach(dw => { idToWorker -= dw.workerId }) println(s"current alive number of worker is : ${idToWorker.size}") } } /* val idToWorker = new mutable.HashMap[String,WorkerInfo]() val CHECK_INTERVAL = 15000 //启动定时器,检测超时的Worker override def preStart(): Unit = { import context.dispatcher context.system.scheduler.schedule(0 millisecond,CHECK_INTERVAL milliseconds,self,CheckTimeOutWorker) } override def receive: Receive = { case RegisterWorker(workerId,memory,cores) => { val workerInfo = new WorkerInfo(workerId, memory, cores) idToWorker(workerId) = workerInfo //idToWorker.put(workerId,workerInfo) //idToWorker += (workerId -> workerInfo) // idToWorker += ((workerId,workerInfo)) //将注册成功的消息返回给Worker sender() ! RegisteredWorker } //Worker定期向Master发送的心跳信息 case Heartbeat(workerId) => { //将对于workerId的WorkerInfo从Map中取出来 if(idToWorker.contains(workerId)){ //跟新时间 val currentTime = System.currentTimeMillis() val workerInfo = idToWorker(workerId) workerInfo.lastHeartbeatTime = currentTime } } //自己给自己发送的检测超时的Worker case CheckTimeOutWorker => { val currentTime = System.currentTimeMillis() val deadWorkers = idToWorker.values.filter(w => currentTime - w.lastHeartbeatTime > 15000) //移除超时的Worker deadWorkers.foreach(dw =>{ //从idToWorker这个map中移除 idToWorker -= dw.workerId }) println(s"current alive number of worker is : ${idToWorker.size}") } }*/ /*val idToWorker = new mutable.HashMap[String,WorkerInfo]() val CHECK_INTERVAL = 15000 override def preStart(): Unit = { import context.dispatcher context.system.scheduler.schedule(0 millisecond,CHECK_INTERVAL milliseconds,self,CheckTimeOutWorker) } override def receive: Receive = { //Worker发生给Master的注册消息 case RegisterWorker(workerId, memory, cores) => { //将消息封装起来 val workerInfo = new WorkerInfo(workerId, memory, cores) //将workerInfo保存起来 idToWorker(workerId) = workerInfo //idToWorker.put(workerId, workerInfo) //idToWorker += (workerId -> workerInfo) //idToWorker += ((workerId,workerInfo)) //将注册成功的消息返回给Worker sender() ! RegisteredWorker } //Worker定期向Master发送的心跳信息 case Heartbeat(workerId) => { //将对于workerId的WorkerInfo从Map中取出来 if(idToWorker.contains(workerId)) { //跟新时间 val currentTime = System.currentTimeMillis() //赋给对于的WorkerInfo val workerInfo = idToWorker(workerId) workerInfo.lastHeartbeatTime = currentTime } } //自己给自己发送的检测超时的Worker case CheckTimeOutWorker => { val currentTime = System.currentTimeMillis() val deadWorkers = idToWorker.values.filter(w => currentTime - w.lastHeartbeatTime > 15000) //移除超时的Worker deadWorkers.foreach(dw => { //从idToWorker这个map中移除 idToWorker -= dw.workerId }) println(s"current alive number of worker is : ${idToWorker.size}") } }*/ /* val idToWorker = new mutable.HashMap[String, WorkerInfo]() val CHECK_INTERVAL = 15000 //启动定时器,检测超时的Worker override def preStart(): Unit = { import context.dispatcher context.system.scheduler.schedule(0 millisecond, CHECK_INTERVAL milliseconds, self, CheckTimeOutWorker) } //是Actor接收消息的方法 override def receive: Receive = { //Worker发生给Master的注册消息 case RegisterWorker(workerId, memory, cores) => { //将消息封装起来 val workerInfo = new WorkerInfo(workerId, memory, cores) //将workerInfo保存起来 idToWorker(workerId) = workerInfo //idToWorker.put(workerId, workerInfo) //idToWorker += (workerId -> workerInfo) //idToWorker += ((workerId,workerInfo)) //将注册成功的消息返回给Worker sender() ! RegisteredWorker } //Worker定期向Master发送的心跳信息 case Heartbeat(workerId) => { //将对于workerId的WorkerInfo从Map中取出来 if(idToWorker.contains(workerId)) { //跟新时间 val currentTime = System.currentTimeMillis() //赋给对于的WorkerInfo val workerInfo = idToWorker(workerId) workerInfo.lastHeartbeatTime = currentTime } } //自己给自己发送的检测超时的Worker case CheckTimeOutWorker => { val currentTime = System.currentTimeMillis() val deadWorkers = idToWorker.values.filter(w => currentTime - w.lastHeartbeatTime > 15000) //移除超时的Worker deadWorkers.foreach(dw => { //从idToWorker这个map中移除 idToWorker -= dw.workerId }) prin.tln(s"current alive number of worker is : ${idToWorker.size}") } }*/ } object Master{ val MASTER_ACTOR_SYSTEM_NAME = "MasterActorSystem" val MASTER_ACTOR_NAME = "MasterActor" def main(args: Array[String]): Unit = { val host = args(0) val port = args(1).toInt val config = ConfigFactory.parseString( s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = $host |akka.remote.netty.tcp.port = $port """.stripMargin) val masterActorSystem = ActorSystem(MASTER_ACTOR_SYSTEM_NAME, config) val masterActor = masterActorSystem.actorOf(Props[Master], MASTER_ACTOR_NAME) } /* val MASTER_ACTOR_SYSTEM_NAME = "MasterActorSystem" val MASTER_ACTOR_NAME = "MasterActor" def main(args: Array[String]): Unit = { val host = args(0) val port = args(1).toInt val config = ConfigFactory.parseString( s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = $host |akka.remote.netty.tcp.port = $port """.stripMargin) val masterActorSystem= ActorSystem(MASTER_ACTOR_SYSTEM_NAME, config) val masterActor = masterActorSystem.actorOf(Props[Master], MASTER_ACTOR_NAME) }*/ /* val MASTER_ACTOR_SYSTEM_NAME = "MasterActorSystem" val MASTER_ACTOR_NAME = "MasterActor" def main(args: Array[String]): Unit = { val host = args(0) val port = args(1).toInt val config = ConfigFactory.parseString( s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = $host |akka.remote.netty.tcp.port = $port """.stripMargin) val masterActorSystem = ActorSystem(MASTER_ACTOR_SYSTEM_NAME, config) val masterActor = masterActorSystem.actorOf(Props[Master], MASTER_ACTOR_NAME) }*/ /* val MATER_ACTOR_SYSTEM_NAME = "MasterActorSystem" val MATER_ACTOR_NAME = "MasterActor" def main(args: Array[String]): Unit = { val host = args(0) val port = args(1).toInt val config = ConfigFactory.parseString( s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = $host |akka.remote.netty.tcp.port = $port """.stripMargin ) //创建一个ActorSystem(在一个进程中只要有一个即可,是单例的) val masterActorSystem = ActorSystem(MATER_ACTOR_SYSTEM_NAME, config) //创建Actor masterActorSystem.actorOf(Props[Master], MATER_ACTOR_NAME)*/ } package com._51doit.akka.rpc //worker发送给Master的注册消息 case class RegisterWorker(workerId: String, memory: Int, cores: Int) //Master -> Worker返回注册成功的消息 case object RegisteredWorker //Worker -> Master心跳消息 case class Heartbeat(workerId: String) //Worker发送给自己的消息 case object SendHeartbeat //Master发送给自己的消息 case object CheckTimeOutWorker package com._51doit.akka.rpc import java.util.UUID import akka.actor.{Actor, ActorSelection, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ /** * 1.Worker跟Master建立连接后,主动向Master发生注册消息(WorkerId,Worker的内存、Worker的CPU核等) */ class Worker(val masterHost:String,val masterPort:Int,val memory:Int,val cores:Int) extends Actor{ var masterActorRef: ActorSelection = _ val workerId = UUID.randomUUID().toString val HEARTBEAT_INTERVAL = 10000 override def preStart(): Unit = { masterActorRef = context.actorSelection(s"akka.tcp://${Master.MASTER_ACTOR_SYSTEM_NAME}@$masterHost:$masterPort/user/${Master.MASTER_ACTOR_NAME}") masterActorRef ! RegisterWorker(workerId,memory,cores) } override def receive: Receive = { case RegisteredWorker => { import context.dispatcher context.system.scheduler.schedule(0 millisecond,HEARTBEAT_INTERVAL milliseconds,self,SendHeartbeat) } case SendHeartbeat => { masterActorRef ! Heartbeat(workerId) } } /*var masterActorRef: ActorSelection = _ val workerId = UUID.randomUUID().toString val HEARTBEAT_INTERVAL = 10000 override def preStart(): Unit = { //worker向master发送注册信息 masterActorRef = context.actorSelection(s"akka.tcp://${Master.MASTER_ACTOR_SYSTEM_NAME}@$masterHost:$masterPort/user/${Master.MASTER_ACTOR_NAME}") masterActorRef ! RegisterWorker(workerId,memory,cores) } override def receive: Receive = { //Master返回给Worker注册成功的消息 case RegisteredWorker => { //启动定时器 import context.dispatcher context.system.scheduler.schedule(0 millisecond,HEARTBEAT_INTERVAL milliseconds,self,SendHeartbeat) } //自己给自己发送的周期性消息 case SendHeartbeat => { //逻辑判断 //向Master发送心跳 masterActorRef ! Heartbeat(workerId) } }*/ /* var masterActorRef: ActorSelection = _ val workerId = UUID.randomUUID().toString val HEARTBEAT_INTERVAL = 10000 override def preStart(): Unit = { masterActorRef = context.actorSelection(s"akka.tcp://${Master.MASTER_ACTOR_SYSTEM_NAME}@$masterHost:$masterPort/user/${Master.MASTER_ACTOR_NAME}") masterActorRef ! RegisterWorker(workerId, memory, cores) } override def receive: Receive = { //Master返回给Worker注册成功的消息 case RegisteredWorker => { //启动定时器 import context.dispatcher context.system.scheduler.schedule(0 milliseconds, HEARTBEAT_INTERVAL milliseconds, self, SendHeartbeat) } //自己给自己发送的周期性消息 case SendHeartbeat => { //逻辑判断 //向Master发送心跳 masterActorRef ! Heartbeat(workerId) } }*/ } object Worker{ val WORKER_ACTOR_SYSTEM_NAME = "WorkerActorSystem" val WORKER_ACTOR_NAME = "WorkerActor" def main(args: Array[String]): Unit = { val masterHost = args(0) val masterPort = args(1).toInt val workerHost = args(2) val workerPort = args(3).toInt val memory = args(4).toInt val cores = args(5).toInt val config = ConfigFactory.parseString( s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = $workerHost |akka.remote.netty.tcp.port = $workerPort """.stripMargin) val workerActorSystem = ActorSystem(WORKER_ACTOR_SYSTEM_NAME, config) val workerActor= workerActorSystem.actorOf(Props(new Worker(masterHost, masterPort, memory, cores)), WORKER_ACTOR_NAME) } /* val WORKER_ACTOR_SYSTEM_NAME = "WorkerActorSystem" val WORKER_ACTOR_NAME = "WorkerActor" def main(args: Array[String]): Unit = { val masterHost = args(0) val masterPort = args(1).toInt val workerHost = args(2) val workerPort = args(3).toInt val memory = args(4).toInt val cores = args(5).toInt val config = ConfigFactory.parseString( s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = $workerHost |akka.remote.netty.tcp.port = $workerPort """.stripMargin) val workerActorSystem = ActorSystem(WORKER_ACTOR_SYSTEM_NAME, config) val workerActor = workerActorSystem.actorOf(Props(new Worker(masterHost, masterPort, memory, cores)), WORKER_ACTOR_NAME) }*/ /* val WORKER_ACTOR_SYSTEM_NAME = "WorkerActorSystem" val WORKER_ACTOR_NAME = "WorkerActor" def main(args: Array[String]): Unit = { val masterHost = args(0) val masterPort = args(1).toInt val workerHost = args(2) val workerPort = args(3).toInt val memory = args(4).toInt val cores = args(5).toInt val config = ConfigFactory.parseString( s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = $workerHost |akka.remote.netty.tcp.port = $workerPort """.stripMargin ) val workerActorSystem = ActorSystem(WORKER_ACTOR_SYSTEM_NAME, config) workerActorSystem.actorOf(Props(new Worker(masterHost, masterPort, memory, cores)), WORKER_ACTOR_NAME) }*/ } package com._51doit.akka.rpc class WorkerInfo(val workerId: String, var memory: Int, var cores: Int){ //最近一次心跳的时间 var lastHeartbeatTime: Long = _ }35.隐式转换与柯里化方法
package cn._51doit.day056 object KelyDemo { def m1(x: Int, y: Int): Int = x * y //定义一个柯里化方法 def m2(x: Int)(y: Int) = x * y def m3(x: Int)(y: Int)(z: Int) = x * y * z def main(args: Array[String]): Unit = { val r1 = m1(5, 6) println(r1) val r2 = m2(5)(6) println(r2) val r3 = m3(5)(6)(7) println(r3) val arr = Array(1,2,3,4,5) arr.fold(0)(_+_) arr.aggregate(0)(_+_, _+_) val f2 = m2 _ println(f2) //f2: Int => (Int => Int) = <function1> //传入部分参数,然后用空格下划线,会生成一个函数,相当于传入了一个参数 val f3 = m2(5) _ f3(8) // 5 * 8 } } package cn._51doit.day056 object ImplicitVariableDemo { implicit val i = 111 //两个参数必须都得传入 def m(x: Int, y: Int): Int = { x * y } def m1(x: Int, y: Int = 8): Int = { x * y } //使用柯里化方法,参数传入默认值,必须用implicit关键字修饰 def m2(x: Int)(implicit y: Int = 8): Int = { x * y } def main(args: Array[String]): Unit = { //val r1 = m(5, 8) //val r2 = m1(5, 10) //println(r2) //val r3 = m2(7)(9) //println(r3) //调用m2这个方法,传入一个参数,第二个参数使用有默认值的隐式参数 //隐式参数:在编译时,会在程序的上下文中查找跟这个隐式参数类型一样的变量,有就传入。 //优先级:传入的 > 上下文的类型一致的隐式参数 > 方法中的默认参数 val r4 = m2(6)(1111) println(r4) } } package cn._51doit.day056 /** * [T <: Comparable[T]] 上界 upper bound java: <T extends Comparable> * [T >: Comparable[T]] 下界 lower bound java: <T super Comparable> * [T <% Comparable] 视图界定 view bound * [T : Comparable] 上下文界定 context bound * [-T] 逆变 方法输入参数 * [+T] 协变 方法返回 * */ class Pair[T <% Comparable[T]] { def bigger(first: T, second: T): T = { if(first.compareTo(second) > 0) first else second } } object Pair { def main(args: Array[String]): Unit = { //val p = new Pair[String] //val r = p.bigger("hadoop", "spark") //val p1 = new Pair[Integer] //将上界改成视图界定,Scala的Int也可以比较了(隐式转换) val p1 = new Pair[Int] //scala中的Int没有实现Comparable接口,所以没法执行 //java中的Integer有实现Comparable接口,可以比 val r = p1.bigger(10, 8) println(r) } } package cn._51doit.day056 import java.io.File import scala.io.Source import MyContext._ class RichFile(val file: File) { def read(): String = { Source.fromFile(file).mkString } } object RichFile { def main(args: Array[String]): Unit = { val file = new File("/Users/xing/Desktop/word.txt") // 1 to 10 , Int类上没有to方法,就会报错,但是调用后发现可以调用,说明有一个增强的to方法 //调用File类上的read方法,File类上没有read方法,直接报错!我想方设法让它不报错,添加一个增强的read方法 //Int上没有to方法,可以在RichInt中扩展一个to方法,同理File上没有read方法,可以在RichFile扩展增强一个read方法 //包装、装饰模式:显式的包装 //val richFile = new RichFile(file) //val content = richFile.read() val content = file.read() println(content) } } package cn._51doit.day056 class Boy(val name: String, var fv: Double) { override def toString = s"Boy($name, $fv)" } package cn._51doit.day056 class Girl(val name: String, var age: Int) extends Ordered[Girl]{ override def compare(that: Girl): Int = { this.age - that.age } override def toString = s"Girl($name, $age)" } package cn._51doit.day056 //不把比较规则写到这个类中,但是还是要具备比较的功能 class Girl2(val name: String, var fv: Double) { override def toString = s"Girl2($name, $fv)" } package cn._51doit.day056 class MissRight[T <: Ordered[T]] { def choose(first: T, second: T): T = { if(first > second) first else second } } object MissRight { def main(args: Array[String]): Unit = { val missRight = new MissRight[Girl] val g1 = new Girl("aaa", 18) val g2 = new Girl("bbb", 19) val g = missRight.choose(g1, g2) println(g) } } package cn._51doit.day056 //视图界定:会有一个隐式转换将T转换成Ordered[T] //视图界定需要一个隐式转换方法或隐式转换函数 class Pair2[T <% Ordered[T]] { def select(first: T, second: T): T = { if(first > second) first else second } } object Pair2 { def main(args: Array[String]): Unit = { import MyContext.girlToOrderedGirl val p = new Pair2[Girl2] val g1 = new Girl2("ab", 88.98) val g2 = new Girl2("hatna", 99.99) val g = p.select(g1, g2) println(g) } } package cn._51doit.day056 //上下文界定,也是实现隐射转换的 //上下文界定需要一个隐式转换参数 class Pair3[T : Ordering] { def choose(first: T, second: T): T = { val ord: Ordering[T] = implicitly[Ordering[T]] if(ord.gt(first, second)) first else second } } object Pair3{ def main(args: Array[String]): Unit = { import MyContext.OrderingBoy val p = new Pair3[Boy] val b1 = new Boy("laoduan", 99.99) val b2 = new Boy("xiaozhao", 9999.99) val b = p.choose(b1, b2) println(b) } } package cn._51doit.day056 //即不用视图界定,又不用上下文界定,但是要实现隐射转换 //结合柯里化实现 class Pair4[T] { //使用柯里化方法接口隐式函数或隐式方法,可以实现类似视图界定的功能 def select(first: T, second: T)(implicit f: T => Ordered[T]): T = { if (first > second) first else second } //使用柯里化方法接口隐式参数,可以实现类似上下文界定的功能 def choose(first: T, second: T)(implicit ord: Ordering[T]): T = { if(ord.gt(first, second)) first else second } } object Pair4 { def main(args: Array[String]): Unit = { val arr = Array[Int](1,2,3,9,8,7,5) // Int 到 Ordering[Int] 的隐式转换参数 arr.sorted //arr.sortBy() val p = new Pair4[Boy] import MyContext.OrderingBoy // implicit val f = (g: Girl2) => new Ordered[Girl2]{ // override def compare(that: Girl2): Int = { // // println("function invoked") // //降序 // (that.fv - g.fv).toInt // // } // } val g1 = new Boy("ab", 88.98) val g2 = new Boy("hatna", 99.99) //val g = p.select(g1, g2) val g = p.choose(g1, g2) println(g) } }