pySpark的worldCount

    技术2024-03-23  89

    第一次通过python编写spark:

           1)通过python编写spark需要通过pyspark这个类库来操作spark;

           2)window电脑本地需要安装spark

    pyspark官网:http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD

    参考:https://blog.csdn.net/lc_1123/article/details/79007231

    https://www.jianshu.com/p/b5e949261cfd

    https://blog.csdn.net/cymy001/article/details/78483723

    https://blog.csdn.net/weixin_41512727/article/details/100131995(window环境配置)

     

    from pyspark import SparkConf, SparkContext # 创建SparkConf和SparkContext conf = SparkConf().setMaster("local").setAppName("wordcount") sc = SparkContext(conf=conf) # list data = ["hello", "world", "hello", "word", "count", "count", "hello"] #hdfs上一个文件 inputFile = "hdfs://node1.hadoop:9000/README.md" # 将Collection的data转化为spark中的rdd并进行操作 #rdd = sc.parallelize(data) rdd = sc.textFile(inputFile) countRdd = rdd.count() #统计rdd中有多少行数据 #在读取文本文件时是通过一行行读取 print('countRdd:',countRdd) from operator import add #resultRdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b) resultRdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(add) # rdd转为collecton并打印 resultColl = sorted(resultRdd.collect()) for line in resultColl: print(line) # 结束 sc.stop()

    说明:

    sparkContext是spark的核心对象,是构建应用程序的入口RDD一般构建方式: 通过一个序列:如list通过读取一个文件通过一个RDD生成一个新的RDDlambda表达式: (lambda line:line.split(" "))=====>line :line.split()左边line是入参,右边line.split()是对参数处理的逻辑表达式函数说明: flatMap():将所有传入参数转为一个listmap():将每个list中元素映射成一个mapreduceByKey():根据map中的key分组做累加sorted():将元素排序collect():执行触发

     

     

     

     

    Processed: 0.011, SQL: 9