pom
完整pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0
</modelVersion>
<groupId>com.msb
</groupId>
<artifactId>StudyFlink
</artifactId>
<version>1.0-SNAPSHOT
</version>
<properties>
<flink.version>1.9.2
</flink.version>
<scala.version>2.11.8
</scala.version>
<redis.version>3.2.0
</redis.version>
<hbase.version>1.3.3
</hbase.version>
<mysql.version>5.1.44
</mysql.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink
</groupId>
<artifactId>flink-scala_2.11
</artifactId>
<version>${flink.version}
</version>
</dependency>
<dependency>
<groupId>org.apache.flink
</groupId>
<artifactId>flink-streaming-scala_2.11
</artifactId>
<version>${flink.version}
</version>
</dependency>
<dependency>
<groupId>org.apache.flink
</groupId>
<artifactId>flink-clients_2.11
</artifactId>
<version>${flink.version}
</version>
</dependency>
<dependency>
<groupId>org.scala-lang
</groupId>
<artifactId>scala-library
</artifactId>
<version>${scala.version}
</version>
</dependency>
<dependency>
<groupId>org.apache.flink
</groupId>
<artifactId>flink-connector-kafka_2.11
</artifactId>
<version>${flink.version}
</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop
</groupId>
<artifactId>hadoop-client
</artifactId>
<version>2.6.5
</version>
</dependency>
<dependency>
<groupId>org.apache.flink
</groupId>
<artifactId>flink-connector-kafka_2.11
</artifactId>
<version>${flink.version}
</version>
</dependency>
<dependency>
<groupId>org.apache.bahir
</groupId>
<artifactId>flink-connector-redis_2.11
</artifactId>
<version>1.0
</version>
</dependency>
<dependency>
<groupId>redis.clients
</groupId>
<artifactId>jedis
</artifactId>
<version>${redis.version}
</version>
</dependency>
<dependency>
<groupId>mysql
</groupId>
<artifactId>mysql-connector-java
</artifactId>
<version>${mysql.version}
</version>
</dependency>
<dependency>
<groupId>org.apache.hbase
</groupId>
<artifactId>hbase-client
</artifactId>
<version>${hbase.version}
</version>
</dependency>
<dependency>
<groupId>org.apache.hbase
</groupId>
<artifactId>hbase-common
</artifactId>
<version>${hbase.version}
</version>
</dependency>
<dependency>
<groupId>org.apache.hbase
</groupId>
<artifactId>hbase-server
</artifactId>
<version>${hbase.version}
</version>
</dependency>
<dependency>
<groupId>org.apache.flink
</groupId>
<artifactId>flink-connector-filesystem_2.11
</artifactId>
<version>${flink.version}
</version>
</dependency>
<dependency>
<groupId>org.apache.flink
</groupId>
<artifactId>flink-statebackend-rocksdb_2.11
</artifactId>
<version>${flink.version}
</version>
</dependency>
<dependency>
<groupId>org.apache.flink
</groupId>
<artifactId>flink-table-planner_2.11
</artifactId>
<version>${flink.version}
</version>
</dependency>
<dependency>
<groupId>org.apache.flink
</groupId>
<artifactId>flink-table-api-scala-bridge_2.11
</artifactId>
<version>${flink.version}
</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools
</groupId>
<artifactId>maven-scala-plugin
</artifactId>
<version>2.15.2
</version>
<executions>
<execution>
<goals>
<goal>compile
</goal>
<goal>testCompile
</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin
</artifactId>
<version>2.4
</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies
</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly
</id>
<phase>package
</phase>
<goals>
<goal>assembly
</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
scala代码
完整代码
package com
.zxl
.stream
import org
.apache
.flink
.streaming
.api
.scala
.{DataStream
, StreamExecutionEnvironment
}
import org
.apache
.flink
.streaming
.api
.scala
._
object WordCount
{
def
main(args
: Array
[String
]): Unit
= {
val env
= StreamExecutionEnvironment
.getExecutionEnvironment
val initStream
:DataStream
[String
] = env
.socketTextStream("node01",8888)
val wordStream
= initStream
.flatMap(_
.split(" ")).setParallelism(3)
val pairStream
= wordStream
.map((_
,1)).setParallelism(3)
val keyByStream
= pairStream
.keyBy(0)
val restStream
= keyByStream
.sum(1).setParallelism(3)
restStream
.print()
env
.execute("first flink job")
}
}
启动测试
本地启动
先启动8888端口
nc -lk 8888
运行main方法
实时输入数据,就会进行流计算 默认就是有状态的计算:上次的计算结果给保留了。
* 6
> (msb,1
)
* 1
> (,,1
)
* 3
> (hello,1
)
* 3
> (hello,2
)
* 6
> (msb,2
)
* 默认就是有状态的计算
* 6
> 代表是哪一个线程处理的
* 相同的数据一定是由某一个thread处理
线程数并不是越多越好,线程多了可能启动线程的时间比执行计算用的时间还要多。 并行度为1,只启东一个线程来处理: 此时前面就没有线程号了:
集群环境运行jar
package打包
选择这个jar包:不要选择带依赖的,因为集群环境中已经有这些jar包了,否则就重复了
使用命令提交任务
将jar包上传到节点上,执行如下命令:
-c 指定主类-d 守护进程方式运行
flink run -c 主类 -d jar包路径
查看web ui的Running Jobs 发送数据: 点进去: 可以看到输出:
使用web ui提交任务
可以关闭web ui提交任务:默认是true开启的
vim conf/flink-conf.yaml
web.submit.enable:
false
查看日志