SparkSql读取HDFS文件写出到MySQL的问题

    技术2022-07-11  119

    SparkSql读取HDFS文件写出到MySQL

    用SparkSQL来处理存放在HDFS文件,操作起来优雅简洁,同时写出到数据库中,比SparkRDD操作跟简单,在此之前确保HDFS所在服务器的访问端口是否开通一般为9000端口,文章里是8020端口。文章中使用的数据为MongoDB的数据到HDFS上的,数据格式(BSON)和JSON是大同小异的。

    数据格式

    {"oil":0,"simNo":"010000000002","carId":304648943587868672,"address":null,"satelliteNumber":0,"reportTime":1592443939000,"status":0,"terminalId":"3212025","longitude":119358299,"plateNo":"苏A00002","statusComments":"ACC开 定位 南纬 西经 停运状态 经纬度已经保密插件加密 车辆油路断开 车辆电路断开 车门加锁 前门开 中门开 后门开 驾驶席门开 使用GPS卫星进行定位 使用北斗卫星进行定位 ","timeIntervalStatus":null,"@version":"1","course":0,"alarmComments":null,"speed":800,"altitude":25,"latitude":32188034,"parkTime":0,"@timestamp":"2020-06-29T17:31:44.861Z","alarmFlag":0,"terminalDriver":null,"mileage":552954} {"oil":0,"simNo":"064606108552","carId":299525840931176448,"address":null,"satelliteNumber":0,"reportTime":1592443998000,"status":0,"terminalId":"2359641","longitude":119358135,"plateNo":"苏L75735","statusComments":"ACC开 定位 南纬 西经 停运状态 经纬度已经保密插件加密 车辆油路断开 车辆电路断开 车门加锁 前门开 中门开 后门开 驾驶席门开 使用GPS卫星进行定位 使用北斗卫星进行定位 ","timeIntervalStatus":null,"@version":"1","course":0,"alarmComments":"GNSS 天线未接或被剪断 ","speed":0,"altitude":20,"latitude":32188071,"parkTime":0,"@timestamp":"2020-06-29T17:31:44.862Z","alarmFlag":32,"terminalDriver":null,"mileage":14120}

    POM依赖

    <!--SparkCore--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.2</version> <exclusions> <exclusion> <artifactId>hadoop-client</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> </exclusions> </dependency> <!--Spark-Sql--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency> <!--和HDFS 集成--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs-client</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.1.0</version> </dependency> <!--MySQL驱动--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.17</version> </dependency>

    主程序代码

    这是要统计前一天的数据,取到需要的字段列,然后进行SQL操作

    import org.apache.log4j.Logger import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} object CarsOfDay { private val logger = Logger.getLogger(CarsOfDay.getClass) def main(args: Array[String]): Unit = { //模拟本地运行 System.setProperty("hadoop.home.dir", "D:/zhoubiao/hadoop_home") System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val spark = SparkSession.builder().appName("CarsOfDay").master("local[*]").getOrCreate() spark.sparkContext.setLogLevel("FATAL") spark.conf.set("spark.sql.caseSensitive", "true") import spark.implicits._ //获取昨天日期 val longTime = CarsUtils.getYesterDay val yesterDay = CarsUtils.getTime(longTime, "yyyy-MM-dd") //hdfs获取URL val url: String = "hdfs://10.20.192.56:8020/usr/locationdata/" + yesterDay //判断文件是否存在 不存在则不操作 val flag = CarsUtils.hdfsIsExist(url) if (flag) { val frame: DataFrame = spark.read.json(url) //这是一个日期转换的的工具类 val time = udf(CarsUtils.getTime3 _) //对列的一些操作 frame .withColumnRenamed("carId", "car_id") .withColumnRenamed("plateNo", "car_number") .withColumnRenamed("@timestamp", "time") .withColumn("time", time($"time")) .createTempView("bd_position") //SQL语句读出前一天的所有车辆 var sql = """ select car_id,car_number,time from bd_position group by car_number,time,car_id """ //frame.printSchema() val result: DataFrame = spark.sql(sql) //写出到Mysql数据库 result.write.mode(SaveMode.Append).format("jdbc").option("url", "jdbc:mysql://10.1.97.7:3306/miracle_beidou?rewriteBatchedStatements=true") .option("dbtable", "t_table") .option("user", "root") .option("password", "root") .option("driver", "com.mysql.cj.jdbc.Driver") .option("batchsize", 5000) .save() logger.info("WRITE SUCCESS") } else { logger.error("WRITE FAILLNG") } spark.stop() } }

    需要注意的问题

    1.如果在本地跑的时候没有问题,丢在服务器上运行spark-submit会报如下错:

    Exception in thread "main" java.lang.IllegalAccessError: class org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763)

    原因是本文使用的环境是Spark2.x和Hadoop 3.x,spark2.x默认引用hadoop2.6.*版本的jar包,删除2.6版本jar包,手动引用3.x jar包再次运行可以解决。配置如下:

    <!--SparkCore--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.2</version> <exclusions> <exclusion> <artifactId>hadoop-client</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> </exclusions> </dependency>

    2.本文中写到的是已经存在建好的一张表,所以用到了withColumnRenamed对字段进行修改,来完成和表的映射关系,同时也要注意类型的匹配,如果没有表,直接写是会自动创建的。

    3.优化问题 可以使用option(“batchsize”, 10000) 配置项来加快写的效率。

    4.关于写出的SaveMode,它有这么几种模式,可根据需求来配置

    SaveMode描述Append在这个原数据的基础上追加写入OverWrite会覆盖掉之前的数据,重新创建写入Ignore在这个目录下如果有这种类型的文件就不会生成,如果没有就会创建.
    Processed: 0.009, SQL: 9