Flink1.10从0到1:(三)WordCount

    技术2022-07-15  70

    一、开发环境

            语言:Scala_2.11.12

           Apache Flink:1.10

    二、开发工具

            官方建议使用 IntelliJ IDEA,因为它默认集成了 Scala和 Maven 环境,使用更加方便,当然使用 Eclipse 也是可以的。开发 Flink 程序时,可以使用 Java、Python 或者 Scala 语言,本人使用 Scala,因为使用 Scala 实现函数式编程会比较简洁。

    三、配置依赖

            开发 Flink 应用程序需要最低限度的 API 依赖。最低的依赖库包括:flink-scala 和flink-streaming-scala。大多数应用需要依赖特定的连接器或其他类库,例如 Kafka 的连接器、TableAPI、CEP 库等。这些不是 Flink 核心依赖的一部分,因此必须作为依赖项手动添加到应用程序中。           依赖中有一些spark及其他依赖,可根据自己情况自行选择 <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <scala.version>2.11</scala.version> <spark.version>2.4.5</spark.version> <flink.version>1.10.0</flink.version> <scala.binary.version>2.11</scala.binary.version> </properties> <dependencies> <!-- Spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- SparkSQL --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- SparkSQL on Hive--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>${spark.version}</version> </dependency> <!--mysql依赖的jar包--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency> <!--SparkStreaming--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- kafka --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams-scala_2.11</artifactId> <version>2.1.0</version> </dependency> <!-- flink-kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- flink --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!--连接 Redis 需要的包--> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.0.1</version> </dependency> <!-- Scala 包--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency> <dependency> <groupId>net.sf.json-lib</groupId> <artifactId>json-lib</artifactId> <version>2.2.2</version> <classifier>jdk15</classifier> </dependency> </dependencies> <build> <plugins> <!-- java编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.0.2</version> <executions> <execution> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <!-- scala编译插件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.3.2</version> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>

    四、案例

    案例一:基于Flink1.10的流处理WordCount(实时)

    Demo:

    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} /** * 测试教程: * 在hadoop04上运行 nc -lk 8888 然后打单词即可 */ object FlinkStreamWordCount { def main(args: Array[String]): Unit = { //1、初始化流计算环境 val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //设置并行度 streamEnv.setParallelism(12) //2、导入隐式转换 import org.apache.flink.streaming.api.scala._ //3、读取数据 val stream: DataStream[String] = streamEnv.socketTextStream("hadoop003", 8888) //4、聚会累加算子 val result: DataStream[(String,Int)] = stream.flatMap(_.split(" ")) .map((_, 1)) .keyBy(0) //分组算子 0或者1 代表下标,前面DateStream[String,Int]的下标 0代表单词 1代表出现的次数 .sum(1) //聚合算子 //5、打印结果 result.print("结果")//结果后面的数子表示线程编号 //6、启动流计算程序 streamEnv.execute("WordCount") } }

    运行结果:

    注:

    1、服务器要安装软件:yum install nc

    2、结果后面的数子为线程编号。

            例如:结果:10> (apache,1) 其中10为线程编号。

    案例二:基于Flink1.10的批处理WordCount(离线)

    Demo:

    import org.apache.flink.api.scala._ import org.apache.flink.api.scala.ExecutionEnvironment object FlinkBatchWordCount { def main(args: Array[String]): Unit = { //初始化flink批处理环境 val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment //使用相对路径获得完整的文件路径 val datePath: URL= getClass.getResource("/WordCount.txt") //flink的DateSet可以类比spark的RDD val date: DataSet[String] = env.readTextFile(datePath.getPath) date.flatMap(_.split(" ")) .map((_,1)) .groupBy(0) .sum(1) .print() } }

    结果:

     

    感悟:

            劳动一日,可得一夜的安眠;勤劳一生,可得幸福的长眠。

     

     

     

     

     

    Processed: 0.011, SQL: 9