SparkSql将数据写入到MySQL中

    技术2022-07-11  85

    通过IDEA编写SparkSql代码

    将person.txt文本文件写入到mysql数据库表当中去

    思路解析:通过sparkContext读取文件,然后转换成RDD,将RDD转换成为DataFrame,然后注册成为一张表,查询出来数据,插入保存到mysql当中去

    代码实现:

    import java.util.Properties import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} case class Peson(id:Int,name:String,age:Int) object Spark_Mysql { /** * 读取文本文件数据,然后写入到mysql数据库表当中去 * */ def main(args: Array[String]): Unit = { //获取sparkSession val sparkSession: SparkSession = SparkSession.builder().appName("spark_Mysql").master("local[2]").getOrCreate() //通过sparkSession得到sparkContext val sparkContext: SparkContext = sparkSession.sparkContext //通过sparkContext 读取文本文件内容,得到RDD val arrRDD: RDD[Array[String]] = sparkContext.textFile("file:///F:\\person.txt").map(x => x.split(" ")) //通过RDD,配合样例类,将我们的数据转换成样例类对象 val personRDD: RDD[Person] = arrRDD.map(x => Person(x(0).toInt,x(1),x(2).toInt)) //导入sparkSession当中的隐式转换,将我们的样例类对象转换成DataFrame import sparkSession.implicits._ val personDF: DataFrame = personRDD.toDF() //打印dataFrame当中的数据 val personDFShow: Unit = personDF.show() //将DataFrame注册成为一张表模型 val personView: Unit = personDF.createTempView("person_view") //获取表当中的数据 val result: DataFrame = sparkSession.sql("select * from person_view") //获取mysql连接 val url ="jdbc:mysql://localhost:3306/userdb" val tableName = "person" val properties = new Properties() properties.setProperty("user","root") properties.setProperty("password","root123") //将我们查询的结果写入到mysql当中去 val jdbc: Unit = result.write.mode(SaveMode.Append).jdbc(url,tableName,properties) sparkContext.stop() sparkSession.close() } }

    所需pom.xml文件

    <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">     <parent>         <artifactId>bigdata</artifactId>         <groupId>bigdata</groupId>         <version>1.0-SNAPSHOT</version>     </parent>     <modelVersion>4.0.0</modelVersion>

        <artifactId>day03_spark</artifactId>

        <properties>         <scala.version>2.11.8</scala.version>         <spark.version>2.2.0</spark.version>     </properties>     <dependencies>         <dependency>             <groupId>org.scala-lang</groupId>             <artifactId>scala-library</artifactId>             <version>${scala.version}</version>

            </dependency>         <dependency>             <groupId>org.apache.spark</groupId>             <artifactId>spark-core_2.11</artifactId>             <version>${spark.version}</version>

            </dependency>         <dependency>             <groupId>org.apache.spark</groupId>             <artifactId>spark-sql_2.11</artifactId>             <version>${spark.version}</version>

            </dependency>         <dependency>             <groupId>org.apache.hadoop</groupId>             <artifactId>hadoop-client</artifactId>             <version>2.7.5</version>

            </dependency>

          <!--  <scope>可以设置打包的级别         默认值 :compile         provided :仅仅在开发时候需要,打包不需要         compile:开发打包时候都要         test   :仅在测试时候有用       </scope>-->         <dependency>             <groupId>org.apache.spark</groupId>             <artifactId>spark-hive_2.11</artifactId>             <version>2.2.0</version>         </dependency>

            <dependency>             <groupId>mysql</groupId>             <artifactId>mysql-connector-java</artifactId>             <version>5.1.38</version>         </dependency>

        </dependencies>     <build>         <sourceDirectory>src/main/scala</sourceDirectory>         <testSourceDirectory>src/test/scala</testSourceDirectory>         <plugins>             <plugin>                 <groupId>org.apache.maven.plugins</groupId>                 <artifactId>maven-compiler-plugin</artifactId>                 <version>3.0</version>                 <configuration>                     <source>1.8</source>                     <target>1.8</target>                     <encoding>UTF-8</encoding>                     <!--    <verbal>true</verbal>-->                 </configuration>             </plugin>             <plugin>                 <groupId>net.alchim31.maven</groupId>                 <artifactId>scala-maven-plugin</artifactId>                 <version>3.2.0</version>                 <executions>                     <execution>                         <goals>                             <goal>compile</goal>                             <goal>testCompile</goal>                         </goals>                         <configuration>                             <args>                                 <arg>-dependencyfile</arg>                                 <arg>${project.build.directory}/.scala_dependencies</arg>                             </args>                         </configuration>                     </execution>                 </executions>             </plugin>             <plugin>                 <groupId>org.apache.maven.plugins</groupId>                 <artifactId>maven-shade-plugin</artifactId>                 <version>3.1.1</version>                 <executions>                     <execution>                         <phase>package</phase>                         <goals>                             <goal>shade</goal>                         </goals>                         <configuration>                             <filters>                                 <filter>                                     <artifact>*:*</artifact>                                     <excludes>                                         <exclude>META-INF/*.SF</exclude>                                         <exclude>META-INF/*.DSA</exclude>                                         <exclude>META-INF/*.RSA</exclude>                                     </excludes>                                 </filter>                             </filters>                             <transformers>                                 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">                                     <mainClass></mainClass>                                 </transformer>                             </transformers>                         </configuration>                     </execution>                 </executions>             </plugin>         </plugins>     </build>

    </project>

     打包提交运行: spark2-submit --class Spark_Mysql --master spark://cdh01:7077 /data/SparkMysql-1.0-SNAPSHOT.jar

     运行结果:

    Processed: 0.009, SQL: 9