flinkwordcount示例

    技术2025-04-12  11

    pom

    完整pom

    <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.msb</groupId> <artifactId>StudyFlink</artifactId> <version>1.0-SNAPSHOT</version> <properties> <flink.version>1.9.2</flink.version> <scala.version>2.11.8</scala.version> <redis.version>3.2.0</redis.version> <hbase.version>1.3.3</hbase.version> <mysql.version>5.1.44</mysql.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.5</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>${redis.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>${flink.version}</version> </dependency> </dependencies> <build> <plugins> <!-- 在maven项目中既有java又有scala代码时配置 maven-scala-plugin 插件打包时可以将两类代码一起打包 --> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <!-- maven 打jar包需要插件 --> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.4</version> <configuration> <!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” --> <!--<appendAssemblyId>false</appendAssemblyId>--> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>

    scala代码

    完整代码

    package com.zxl.stream import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala._ object WordCount { def main(args: Array[String]): Unit = { //准备环境 /** * createLocalEnvironment 创建一个本地执行的环境 local * createLocalEnvironmentWithWebUI 创建一个本地执行的环境 同时还开启Web UI的查看端口 8081 * getExecutionEnvironment 根据你执行的环境创建上下文,比如local cluster */ val env = StreamExecutionEnvironment.getExecutionEnvironment /** * DataStream:一组相同类型的元素 组成的数据流 * 如果数据源是scoket 并行度只能是1 */ val initStream:DataStream[String] = env.socketTextStream("node01",8888) val wordStream = initStream.flatMap(_.split(" ")).setParallelism(3) val pairStream = wordStream.map((_,1)).setParallelism(3) val keyByStream = pairStream.keyBy(0) val restStream = keyByStream.sum(1).setParallelism(3) restStream.print() /** * 6> (msb,1) * 1> (,,1) * 3> (hello,1) * 3> (hello,2) * 6> (msb,2) * 默认就是有状态的计算 * 6> 代表是哪一个线程处理的 * 相同的数据一定是由某一个thread处理 **/ //启动Flink 任务 env.execute("first flink job") } }

    启动测试

    本地启动

    先启动8888端口

    nc -lk 8888

    运行main方法

    实时输入数据,就会进行流计算 默认就是有状态的计算:上次的计算结果给保留了。

    * 6> (msb,1) * 1> (,,1) * 3> (hello,1) * 3> (hello,2) * 6> (msb,2) * 默认就是有状态的计算 * 6> 代表是哪一个线程处理的 * 相同的数据一定是由某一个thread处理

    线程数并不是越多越好,线程多了可能启动线程的时间比执行计算用的时间还要多。 并行度为1,只启东一个线程来处理: 此时前面就没有线程号了:

    集群环境运行jar

    package打包

    选择这个jar包:不要选择带依赖的,因为集群环境中已经有这些jar包了,否则就重复了

    使用命令提交任务

    将jar包上传到节点上,执行如下命令:

    -c 指定主类-d 守护进程方式运行 flink run -c 主类 -d jar包路径

    查看web ui的Running Jobs 发送数据: 点进去: 可以看到输出:

    使用web ui提交任务

    可以关闭web ui提交任务:默认是true开启的

    vim conf/flink-conf.yaml web.submit.enable: false #关闭

    查看日志

    Processed: 0.015, SQL: 9