Local模式就是运行在一台计算机 上的模式,通常就是用于在本机.上练手和测试。 它可以通过以下集中方式设置Master。 local:所有计算都运行在一- 个线程当中,没有任何并行计算,通常我们在本机执行 一-些测试代码,或者练手,就用这种模式; local[K]:指定使用几个线程来运行计算,比如local[4]就 是运行4个Worker线程。通常我们的Cpu有几个Core,就指定几个线程,最大化利用Cpu的计算能力; local[*]:这种模式直接帮你按照Cpu最多Cores来设置线程数了。
1)下载并解压spark安装包
wget http://archive.apache.org/dist/spark/spark-2.1.1/spark-2.1.1-bin-hadoop2.7.tgz tar -zxvf spark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/ mv spark-2.1.1-bin-hadoop2.7 spark2)官方求PI案例
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --executor-memory 1G \ --total-executor-cores 2 \ ./examples/jars/spark-examples_2.11-2.1.1.jar \ 100(1)基本语法
bin/spark-submit \ --class <main-class> --master <master-url> \ --deploy-mode <deploy-mode> \ --conf <key>=<value> \ ... # other options <application-jar> \ [application-arguments](2)参数说明:
--master 指定Master的地址,默认为Local --class: 你的应用的启动类 (如 org.apache.spark.examples.SparkPi) --deploy-mode: 是否发布你的驱动到worker节点(cluster) 或者作为一个本地客户端 (client) (default: client)* --conf: 任意的Spark配置属性, 格式key=value. 如果值包含空格,可以加引号“key=value” application-jar: 打包好的应用jar,包含依赖. 这个URL在集群中全局可见。 比如hdfs:// 共享存储系统, 如果是 file:// path, 那么所有的节点的path都包含同样的jar application-arguments: 传给main()方法的参数 --executor-memory 1G 指定每个executor可用内存为1G --total-executor-cores 2 指定每个executor使用的cup核数为2个3)结果展示
该算法是利用蒙特·卡罗算法求PI
1)准备文件input
cd /opt/module/spark mkdir input在input下创建3个文件1.txt和2.txt,并输入以下内容
vi 1.txt hello scala hello spark vi 2.txt hello hadoop hello zookeeper2)启动spark-shell
cd /opt/module/spark ./bin/spark-shell3)运行WordCount程序
输入sc.+tab键查看sc 的所以方法
scala> sc.textFile("input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect数据流分析:
textFile(“input”):读取本地文件input文件夹数据;
flatMap(_.split(" ")):压平操作,按照空格分割符将一行数据映射成一个个单词;
map((_,1)):对每一个元素操作,将单词映射为元组;
reduceByKey(+):按照key将值进行聚合,相加;
collect:将数据收集到Driver端展示。
图解:
4)IDEA运行WordCount
准备工作,创建maven项目
pom.xml加入依赖包
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.1</version> </dependency> </dependencies> <build> <finalName>WordCount</finalName> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>WordCount
package com.pyp.spark import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object WordCount { def main(args: Array[String]): Unit = { //local模式 //创建SparkConf对象 //设定Spark计算框架的运行环境 //app id val confing: SparkConf=new SparkConf().setMaster("local[*]").setAppName("WordCount") //创建spark上下文对象 val sc= new SparkContext(confing) //读取文件,将文件内容一行一行的读取出来 val lines:RDD[String]=sc.textFile("input") //将一行一行的数据分解一个一个的单词 val words: RDD[String] = lines.flatMap(_.split(" ")) //为了统计方便,讲单词数据进行结构的转换 val wordToOne: RDD[(String, Int)] = words.map((_, 1)) //对转换结构后的数据进行聚合 val wordSum: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _) //统计结果采集后打印到控制台 val result: Array[(String, Int)] = wordSum.collect() //scala集合 result.foreach(println) //排序 val res : RDD[(String,Int)] = wordSum.sortBy(_._2,false) val resu: Array[(String, Int)] = res.collect() resu.foreach(println) } }结果
结果