flink 1.10demo

    技术2023-11-14  67

    package apps; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import udf.ConcatAggFunction; import java.util.ArrayList; import java.util.List; import java.util.Properties; public class StreamingJob { // private static final Logger LOG = LoggerFactory.getLogger(StreamingJob.class); public static void main(String[] args) throws Exception { // LOG.info("StreamingJob start ........"); // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // StreamTableEnvironment tableEnv = TableEnvironment.create(env); EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSettings); tableEnv.registerFunction("CONCAT_AGG", new ConcatAggFunction()); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 10 * 1000)); String checkpointPath = "hdfs://hz-cluster5/user/fs-checkpoints/cpk/"; if ("local".equals(args[0])) { checkpointPath = "file:///D:/checkpoint/"; } //重启策略 //状态checkpoint保存 StateBackend fsStateBackend = new FsStateBackend(checkpointPath); env.setStateBackend(fsStateBackend); env.getCheckpointConfig().setFailOnCheckpointingErrors(false); env.enableCheckpointing(60 * 1000).getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); DataStream<String> text = env.socketTextStream("127.0.0.1", 9000); DataStream<Tuple2<String, String>> dataStream = text.flatMap(new FlatMapFunction<String, Tuple2<String, String>>() { @Override public void flatMap(String s, Collector<Tuple2<String, String>> collector) throws Exception { String[] tokens = s.toLowerCase().split("\\|"); collector.collect(new Tuple2<String, String>(tokens[0], tokens[1])); } }); // tableEnv.registerDataStream("batchService", dataStream,"uid,ua,proctime.proctime"); tableEnv.createTemporaryView("batchService", dataStream, "uid,ua,proctime.proctime"); String sqlt = "select CONCAT_AGG(uid) as var2,ua from batchService group by tumble(proctime, interval '1' MINUTE), ua"; Table tapiResult = tableEnv.sqlQuery(sqlt); DataStream<Row> rdataStream = tableEnv.toAppendStream(tapiResult, Row.class); rdataStream.print(); rdataStream.flatMap(new FlatMapFunction<Row, List<String>>() { @Override public void flatMap(Row row, Collector<List<String>> collector) throws Exception { String[] uids = row.getField(0).toString().split("\\|"); List<String> result = new ArrayList<>(uids.length); for (String uid : uids){ result.add(uid +"-"+ row.getField(1)); } collector.collect(result); } }).print(); // execute program env.execute("Java from SocketTextStream Example"); } } package udf; import java.util.HashSet; import java.util.Set; public class ConcatAggSet { private Set<String> values; public ConcatAggSet() { this.values = new HashSet<>(); } public Set<String> getValues() { return this.values; } public void addValue(String value){ this.values.add(value); } public void addAll(Set<String> values){ this.values.addAll(values); } public void clear(){ this.values.clear(); } } package udf; import org.apache.flink.table.functions.AggregateFunction; import java.util.Iterator; public class ConcatAggFunction extends AggregateFunction<String, ConcatAggSet> { @Override public ConcatAggSet createAccumulator() { return new ConcatAggSet(); } @Override public String getValue(ConcatAggSet accumulator) { return String.join("|", accumulator.getValues()); } public void accumulate(ConcatAggSet acc, String value) { acc.addValue(value); } public static void merge(ConcatAggSet acc, Iterable<ConcatAggSet> it) { Iterator<ConcatAggSet> iter = it.iterator(); while (iter.hasNext()) { ConcatAggSet a = iter.next(); acc.addAll(a.getValues()); } } public static void resetAccumulator(ConcatAggSet acc) { acc.clear(); } }

     

    Processed: 0.011, SQL: 9