学习前提:有一定的Scala基础、基本的 Linux 基础,对Spark有一定的概念,并且安装好了Spark环境。
相关环境参考教程: 1、分布式集群环境之Scala的安装与配置(Centos7) 2、分布式集群环境之Spark的安装与配置(Centos7)
a. 启动 Spark Shell
Spark-Shell是 Spark 自带的一个 Scala 交互式操作 Shell ,类似于 Python 或者其他脚本语言的 Shell ,其可以以脚本方式进行交互式执行。安装好Spark之后,在任意非bin路径,直接在Linux命令行中输入:
spark-shell即可进入 Spark Shell 界面: b. 关闭 Spark Shell 按 Ctrl + D 即可退出。
启动Spark Shell之后,可以打开一个新的终端,然后输入jps,查看目前有哪些Java进程,可以看到: 注意:此处只在master节点上启动,所以,slave1、slave2是没有SparkSubmit进程的。
a. 读取Spark内置数据 数据位置:$SPARK_HOME/data/graphx/users.txt, 如我的位置是(需替换成自己实际的路径):/home/hadoop-sny/bigdata/spark-2.2.0-bin-hadoop2.7/data/graphx/users.txt 查看内容: 也可以自己创建文件:
1,BarackObama,Barack Obama 2,ladygaga,Goddess of Love 3,jeresig,John Resig 4,justinbieber,Justin Bieber 6,matei_zaharia,Matei Zaharia 7,odersky,Martin Odersky 8,anonsys我们来统计一下users.txt文件一共有多少行,并且打印第一行内容,进入Spark Shell界面,输入内容:
var file = sc.textFile("/home/hadoop-sny/bigdata/spark-2.2.0-bin-hadoop2.7/data/graphx/users.txt") file.count() file.first()代码解释:
创建了一个 RDD file;count()获取 RDD 的行数;first()获取第一行的内容。当然,我们还可以继续执行其他操作, 比如查找有多少行含有"Obama":
file.filter(line => line.contains("Obama")).count()具体的函数、算子,就需要自己有一定的基础的,但是你可以先了解。 此代码的意思就是file的RDD调用一个过滤算子 filter ,过滤条件是判断有Obama,如果有,则保留下来,保留下来之后再进行 count 计算行数操作,最后统计结果为 1。
此案例来源于教程:IntelliJ IDEA开发Spark案例之WordCount 里0x02 编写WordCount代码的完整代码,如下为教程截图: 为了方便大家理解,此处重新启动 Spark Shell,如已启动,则按 Ctrl + D 即可退出,然后输入 spark-shell 启动,观察显示的内容: 根据回显信息可知,其实在启动 Spark Shell 的时候,已经给我们实例化出了两个非常关键的对象: SparkContext 对象(sc)、SparkSession对象(spark),此处使用到的是SparkContext 对象,那么教程里的 SparkContext 对象就不用再重新执行了,直接用就可以。
我们实际上需要执行的是下面这几句:
val textFileRDD = sc.textFile("/home/hadoop-sny/datas/word.txt") val wordRDD = textFileRDD.flatMap(line => line.split(" ")) val pairWordRDD = wordRDD.map(word => (word, 1)) val wordCountRDD = pairWordRDD.reduceByKey((a, b) => a + b) wordCountRDD.foreach(println)代码解释:
第一行:读取一个 word.txt文件,生成一个叫 textFileRDD 的RDD第二行: textFileRDD 调用flatMap算子,对每行进行切分操作,切割符是空格,生成内容为 wordRDD第三行:wordRDD 对切割后的每一个单词进行map映射操作,给每一个单词映射成(word, 1)的形式,生成内容为 pairWordRDD第四行:pairWordRDD 进行 reduceByKey 操作,根据相同的 key,对 value 进行相加操作,也就是统计操作,返回值是 wordCountRDD第五行:打印 wordCountRDD 的内容,也就是查看统计结果,foreach 为action算子,如无Action算子,无法执行 Spark 作业。 创建一个需要统计的新文件 vi /home/hadoop-sny/datas/word.txt添加内容:
shao shao shao nai yi yi nai hello hello hi注意:因为我的用户名为 hadoop-sny,所以我的 ~ 表示:/home/hadoop-sny/,用户名不同,则不同,自己需要特别留意此波浪线。
执行结果如下: 其实,可以一步到位,只是不美观:
sc.textFile("/home/hadoop-sny/datas/word.txt").flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b).foreach(println)还可以更简洁点:
sc.textFile("/home/hadoop-sny/datas/word.txt").flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).foreach(println)前面的Spark Shell实际上使用的是Scala交互式Shell,实际上 Spark 也提供了一个用 Python 交互式Shell,即Pyspark。 启动: pyspark 需要注意的是此处Spark内置的Python是2.7.5版本的,关闭也是按 Ctrl + D 即可。
后面的教程主要是使用 Spark-Shell ,对于 Pyspark 大家可以自行查找资料学习:官方文档Spark Python API
作者简介:邵奈一 全栈工程师、市场洞察者、专栏编辑 | 公众号 | 微信 | 微博 | | 简书 |
福利: 邵奈一的技术博客导航 邵奈一 原创不易,如转载请标明出处。