flink1.10
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html
CREATE TABLE sink-hehe ( id STRING, name STRING, age INT, home STRING ) WITH ( ‘connector.type’ = ‘kafka’, – kafka connector ‘connector.version’ = ‘0.11’, – universal 支持 0.11 以上的版本 ‘connector.topic’ = ‘topic-stu’, – kafka topic ‘connector.startup-mode’ = ‘earliest-offset’, – 从起始 offset 开始读取 ‘connector.properties.zookeeper.connect’ = ‘hadoop101:2181’, – zk 地址 ‘connector.properties.bootstrap.servers’ = ‘hadoop101:9092’, – broker 地址 ‘format.type’ = ‘json’ – 数据源格式为 json );
tables: - name: source-student type: source-table update-mode: append connector: type: filesystem path: "/path/to/something.csv" format: type: csv fields: - name: id type: VARCHAR - name: name type: VARCHAR - name: age type: INT - name: home type: VARCHAR line-delimiter: "\n" comment-prefix: "#" schema: - name: id type: VARCHAR - name: name type: VARCHAR - name: age type: INT - name: home type: VARCHAR #下面的操作是创建了一个视图,比如经常用到age>18这个过滤条件,可以根据这个条件创建一个视图。 # 可以根据自己的需要创建多个视图。 - name: MyCustomView type: view query: "SELECT * FROM student where age >18" #输出到kafak - name: sink-kafkaTable type: sink-table update-mode: append connector: property-version: 1 type: kafka version: 0.11 topic: topic-stu properties: zookeeper.connect: hadoop101:2181 bootstrap.servers: hadoop101:9092 group.id: test format: property-version: 1 type: json derive-schema: true schema: - name: id type: VARCHAR - name: name type: VARCHAR - name: age type: INT - name: home type: VARCHAR #注册一个函数,打包放到flink的lib目录下 functions: - name: IsGroupUp from: class class: com.otis.udf.IsGroupUp对于第二种窗口,状态是无限增长的。可以设置状态停留的时间,最短一天最长两天。图中代码配置。在状态大小和精确度之间做权衡。