节点分配
JobManagerTaskManagerZooKeeperhadoop01√√√hadoop02√√√hadoop03√√重新加载配置文件
source ~/.bash_profile拷贝zoo.cfg、hdfs-site.xml、core-site.xml到flink配置文件目录
cp $ZOOKEEPER_HOME/conf/zoo.cfg $FLINK_HOME/conf/ cp $HADOOP_HOME/etc/hadoop/hdfs-site.xml $FLINK_HOME/conf/ cp $HADOOP_HOME/etc/hadoop/core-site.xml $FLINK_HOME/conf/三台机器都要重新加载配置文件
source ~/.bash_profile如果前面修改了jobmanager.rpc.address的值,请修改hadoop02上的flink-conf.yaml中jobmanager.rpc.address的值为hadoop02,hadoop03可改可不改,这样才能看出高可用集群的效果!!
依次启动zk、hdfs、flink
zkServer.sh start start-dfs.sh start-cluster.sh查看进程
jps查看Web UI http://hadoop01:8081/
可以跑一个官方案例测试一下(输入文件为flink文件夹中的README.txt文件)
flink run -m hadoop02:8081 \ $FLINK_HOME/examples/batch/WordCount.jar至此集群搭建成功!!
停止集群命令
stop-cluster.shMaven依赖
<properties> <flink.version>1.7.2</flink.version> <hadoop.version>2.7.6</hadoop.version> <scala.version>2.11.8</scala.version> </properties> <dependencies> <!-- flink核心API --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> </dependencies>WordCountJava.java
package wc; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.api.java.operators.MapOperator; import org.apache.flink.api.java.tuple.Tuple2; /** * @Author Daniel * @Description java版本Flink wordcount 程序 **/ public class WordCountJava { public static void main(String[] args) { //编程入口 ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); //数据源 DataSource<String> dataSource = batchEnv.fromElements("hadoop hadoop", "spark saprk saprk", "flink flink flink"); //flatMap算子,一行转多行 FlatMapOperator<String, String> wordDataSet = dataSource.flatMap((FlatMapFunction<String, String>) (value, out) -> { String[] words = value.split(" "); for (String word : words) { out.collect(word); } }).returns(Types.STRING); //map算子,计数 MapOperator<String, Tuple2<String, Integer>> wordAndOneDataSet = wordDataSet.map((MapFunction<String, Tuple2<String, Integer>>) value -> new Tuple2(value, 1)) .returns(Types.TUPLE(Types.STRING, Types.INT)); //分组并计数 AggregateOperator<Tuple2<String, Integer>> lastResult = wordAndOneDataSet.groupBy(0) .sum(1); try { //Sink打印结果 lastResult.print(); // batchEnv.execute("WordCountJava");//批处理不用此方法,流处理得使用 } catch (Exception e) { e.printStackTrace(); } } }WordCountScala.scala
package wc import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _} /** * @Author Daniel * @Description scala版本Flink wordcount 程序 **/ object WordCountScala { def main(args: Array[String]): Unit = { //获取flink编程入口 val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment //从网络端口读取流数据 val dS = streamEnv.socketTextStream("hadoop01", 9999) // 主要业务逻辑 val resultDS = dS.flatMap(line => line.toString.split(" ")) .map(word => Word(word, 1)) .keyBy("word") .sum("count") //输出 resultDS.print() //进行流数据处理,不间断的运行 streamEnv.execute("StreamWordCountScala") } } //良好的数据结构 case class Word(word: String, count: Int) nc -lk hadoop01 9999 > hadoop hadoop spark spark spark flink flink flink flink