Flink 在 idea上提交任务到远程服务器

    技术2023-06-02  92

    Flink自身提供了远程提交任务的环境,源码如下:

    请查看StreamExecutionEnvironment 类中 createRemoteEnvironment 方法

    def createRemoteEnvironment( host: String, port: Int, parallelism: Int, jarFiles: String*): StreamExecutionEnvironment = { val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*) javaEnv.setParallelism(parallelism) new StreamExecutionEnvironment(javaEnv) }
    远程提交示例代码如下:
    package com.flink.remotesubmit import org.apache.flink.streaming.api.scala._ object RemoteSubmitApp extends App { val host: String = "node02" val port: Int = 8081 val jarFiles = "E:\\CDHProjectDemo\\flink-demo\\target\\flink-demo-0.0.1-SNAPSHOT.jar" val env = StreamExecutionEnvironment.createRemoteEnvironment(host, port, jarFiles) val socketHost: String = "node01" val socketPort: Int = 7777 val socketDs: DataStream[String] = env.socketTextStream(socketHost, socketPort) socketDs.flatMap(_.split(" ")) .map((_, 1)) .keyBy(0) .sum(1) .print() env.execute("Remote Submit Job") }

    注意:

    需要保持代码和jar一致性,意思就是修改代码之后需重新执行 mvn clean package需在项目的 src/main/resource 目录中添加相关配置文件(core-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml等)
    运行情况

    Processed: 0.010, SQL: 9