写在前面:我是「云祁」,一枚热爱技术、会写诗的大数据开发猿。昵称来源于王安石诗中一句 [ 云之祁祁,或雨于渊 ] ,甚是喜欢。 写博客一方面是对自己学习的一点点总结及记录,另一方面则是希望能够帮助更多对大数据感兴趣的朋友。如果你也对 数据中台、数据建模、数据分析以及 Flink/Spark/Hadoop/数仓开发 感兴趣,可以关注我 https://blog.csdn.net/BeiisBei ,让我们一起挖掘数据的价值~ 每天都要进步一点点,生命不是要超越别人,而是要超越自己! (ง •_•)ง
Table API 和 SQL 的程序结构,与流式处理的程序结构十分类似
val tableEnv = ... // 创建表的执行环境 // 创建一张表,用于读取数据 tableEnv.connect(...).createTemporaryTable("inputTable") // 创建一张表,用于把计算结构输出 tableEnv.connect(...).createTemporaryTable("outputTable") // 通过 Table API 查询算子,得到一张结果表 val result = tableEnv.from("inputTable").select(...) // 通过 SQL 查询语句,得到一张结果表 val sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...") // 将结果表写入输出表中 result.insertInto("outputTable")TableEnvironment 是 flink 中集成Table API 和 SQL 的核心概念,所有对表的操作都基于 TableEnvironment
注册 Catalog在 Catalog 中注册表执行 SQL 查询注册用户自定义函数(UDF)不同处理环境的定义:
TableEnvironment 可以调用.connect() 方法,连接外部系统,并调用.createTemporaryTable() 方法,在 Catalog 中注册表
tableEnv .connect(...) // 定义表的数据来源,和外部系统建立连接 .withFormat(...) // 定义数据格式化方法 .withSchema(...) // 定义表结构 .createTemporaryTable("MyTable") // 创建临时表可以创建Table来描述文件数据,它可以从文件中读取,或者将数据写入文件
可以看到,我们从txt文件中读出六条数据,并以三元组的形式进行输出。
消费Kafka数据
测试结果如下:
true / false —> 表示数据是否是新增 or 撤回回收 。
SQL 查询示例:
将 DataStream 转换成表
对于一个DataStream,可以直接转换成Table,进而方便地调用 Table API 做转换操作 val dataStream:DataStream[SensorReading] = ... val sensorTable:Table = tableEnv.fromDataStream(dataStream) 默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来 val dataStream:DataStream[SensorReading] = ... val sensorTable = tableEnv.fromDataStream(dataStream, 'id,'timestamp,'temperature)数据类型与Schema的对应
DataStream 中的数据类型,与表的 Schema 之间的对应关系,可以有两种:基于字段名称,或者基于字段位置基于名称(name-based) val sensorTable = tableEnv.formDataStream( 'timestamp as 'ts,'id as 'myId,'temperature) 基于位置(position-based) val sensorTable = tableEnv.from创建临时视图(Temporary View)
基于 DataStream 创建临时视图 tableEnv.createTemporaryView("sensorView",dataStream) tableEnv.create 云 祁 认证博客专家 Flink Spark 数据中台 我是「云祁」,一枚热爱技术、会写诗的大数据开发猿,专注数据中台和 Flink / Spark / Hive 等大数据技术,欢迎一起交流学习。