SparkRDD QuickStart

    技术2023-03-26  96

    前言

    本章主要介绍Spark RDD的QuickStart. 并且记录相关的操作过程与错误.


    Spark 集群与本地集群

    本地集群 配置spark-en.sh和slaves文件设置相关配置即可. 主要都在conf文件夹内. 其余相关操作见本系列的前几节.

    slaves文件 #slaves文件 # A Spark Worker will be started on each of the machines listed below. localhost #192.168.31.80 spark-env.sh 文件 # spark-env.sh 文件 export SCALA_HOME=/Users/Sean/Software/Scala/scala-2.11.8 #export SCALA_HOME=/Users/Sean/Software/Scala/scala-2.11.7 export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home export SPARK_MASTER_IP=127.0.0.1 export SPARK_LOCAL_IP=127.0.0.1 export SPARK_WORKER_MEMORY=1g #export SPARK_WORKER_MEMORY=512M export SPARK_EXECUTOR_CORES=2 export SPARK_MASTER_PORT=7077 #export master=spark://192.168.31.80:7070

    设置单个节点为1G2核.

    启动文件 sbin/start-all.sh cd sbin ./start-all.sh

    Java RDD 程序

    本地运行&集群运行 SparkConf sparkConf = new SparkConf().setAppName("Spark-Overview-WordCount"); if(runLocalFlag) { sparkConf.setMaster("local[2]"); }else { sparkConf.setMaster("spark://localhost:7077") .setJars(new String[] {"/Users/sean/Documents/Gitrep/bigdata/spark/target/spark-demo.jar"}); } //获取context对象 JavaSparkContext context = new JavaSparkContext(sparkConf); 本地运行 setMaster(“local[2]”)集群模式 setMaster(“spark://localhost:7077”) .setJars(new String[] {"/Users/sean/Documents/Gitrep/bigdata/spark/target/spark-demo.jar"}); }注意事项: 集群模式需要将代码打成包, 随后设定指定地址. 提交远端的集群需要使用spark-submit命令进行提交. RDD 相关操作 JavaSparkContext sc = new JavaSparkContext(conf); List<Integer> data = Arrays.asList(1,2,3,4,5); JavaRDD<Integer> distData = sc.parallelize(data); int sum = distData.reduce((a, b) -> a + b); //distData.map((x) -> x*10); //distData.map((x) -> x).reduce((a,b) ->(a+b)); System.out.println(sum);

    例子中, 使用了map与reduce两种算子.


    Q&A

    Q1: Failed to connect to master master_hostname:7077 解决办法: 查看localhost:8080发现. hostname不太对. 修改hostname重启机器即可. [1]. 关于Spark报错不能连接到Server的解决办法(Failed to connect to master master_hostname:7077)

    Q2: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field.

    Q3: org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.

    解决措施:

    集群模式提交时候, 没有setJars(xxxx). 将包打包, 一并提交即可.使用本地开发模式local[2]. [1] .使用Idea提交Spark程序 [2]. Spark SerializedLambda错误解决方案 [3]. idea连接spark集群报错解析:Caused by: java.lang.ClassCastException Q4: Spark 设置日志输出级别. org.apache.spark.SparkException: Could not find CoarseGrainedScheduler pyspark JavaSparkContext sc = new JavaSparkContext(conf); ### 好像不怎么管用. sc.setLogLevel("WARN");

    Q5: 未分配到资源. 增加节点分配的内存和其他.

    Q6: 转换异常. (解决办法见Q2 Q3)

    20/07/02 21:06:35 ERROR Executor: Exception in task 1.2 in stage 0.0 (TID 4) java.io.IOException: unexpected exception type at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1582) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1154) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1817) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1148) ... 27 more Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization at com.yanxml.bigdata.java.spark.rdd.FirstRDDDemo.$deserializeLambda$(FirstRDDDemo.java:10) ... 37 more
    Processed: 0.011, SQL: 9