通过IDEA编写SparkSql代码
将person.txt文本文件写入到mysql数据库表当中去
思路解析:通过sparkContext读取文件,然后转换成RDD,将RDD转换成为DataFrame,然后注册成为一张表,查询出来数据,插入保存到mysql当中去
代码实现:
import java.util.Properties import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} case class Peson(id:Int,name:String,age:Int) object Spark_Mysql { /** * 读取文本文件数据,然后写入到mysql数据库表当中去 * */ def main(args: Array[String]): Unit = { //获取sparkSession val sparkSession: SparkSession = SparkSession.builder().appName("spark_Mysql").master("local[2]").getOrCreate() //通过sparkSession得到sparkContext val sparkContext: SparkContext = sparkSession.sparkContext //通过sparkContext 读取文本文件内容,得到RDD val arrRDD: RDD[Array[String]] = sparkContext.textFile("file:///F:\\person.txt").map(x => x.split(" ")) //通过RDD,配合样例类,将我们的数据转换成样例类对象 val personRDD: RDD[Person] = arrRDD.map(x => Person(x(0).toInt,x(1),x(2).toInt)) //导入sparkSession当中的隐式转换,将我们的样例类对象转换成DataFrame import sparkSession.implicits._ val personDF: DataFrame = personRDD.toDF() //打印dataFrame当中的数据 val personDFShow: Unit = personDF.show() //将DataFrame注册成为一张表模型 val personView: Unit = personDF.createTempView("person_view") //获取表当中的数据 val result: DataFrame = sparkSession.sql("select * from person_view") //获取mysql连接 val url ="jdbc:mysql://localhost:3306/userdb" val tableName = "person" val properties = new Properties() properties.setProperty("user","root") properties.setProperty("password","root123") //将我们查询的结果写入到mysql当中去 val jdbc: Unit = result.write.mode(SaveMode.Append).jdbc(url,tableName,properties) sparkContext.stop() sparkSession.close() } }所需pom.xml文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>bigdata</artifactId> <groupId>bigdata</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion>
<artifactId>day03_spark</artifactId>
<properties> <scala.version>2.11.8</scala.version> <spark.version>2.2.0</spark.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version>
</dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version>
</dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version>
</dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.5</version>
</dependency>
<!-- <scope>可以设置打包的级别 默认值 :compile provided :仅仅在开发时候需要,打包不需要 compile:开发打包时候都要 test :仅在测试时候有用 </scope>--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.2.0</version> </dependency>
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency>
</dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> <!-- <verbal>true</verbal>--> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
</project>
打包提交运行: spark2-submit --class Spark_Mysql --master spark://cdh01:7077 /data/SparkMysql-1.0-SNAPSHOT.jar
运行结果: