Flink 之 KeyedState 的使用

    技术2022-07-12  63

    场景:         不使用 Flink 提供的 sum() 方法,使用 KeyedState 完成对单词分组求和计算操作。

    代码: /**  * TODO 不使用 Flink 提供的sum()方法,对单词进行分组求和计算。  *  * @author liuzebiao  * @Date 2020-2-17 11:42  */ public class KeyedStateDemo {

        public static void main(String[] args) throws Exception {         //1.创建一个 flink steam 程序的执行环境         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            //2.使用StreamExecutionEnvironment创建DataStream         DataStreamSource<String> lines = env.socketTextStream( "localhost", 8888);

            //Transformation(s) 对数据进行处理操作         SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {             @Override             public Tuple2<String, Integer> map(String word) {                 //将每个单词与 1 组合,形成一个元组                 return Tuple2.of(word, 1);             }         });

            //进行分组聚合(keyBy:将key相同的分到一个组中)         KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = wordAndOne.keyBy(0);

            /** 使用 KeyedState 通过中间状态求和  ----- start ----**/         SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyedStream.map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {

                //状态数据不参与序列化,添加 transient 修饰             private transient ValueState<Integer> valueState;

                @Override             public void open(Configuration parameters) throws Exception {                 //获取 KeyedState 数据                 ValueStateDescriptor stateDescriptor = new ValueStateDescriptor("sum-key-state", Types.TUPLE(Types.STRING, Types.INT));

                    valueState = getRuntimeContext().getState(stateDescriptor);             }

                @Override             public Tuple2<String, Integer> map(Tuple2<String, Integer> tuple2) throws Exception {                 //输入的单词                 String word = tuple2.f0;                 //输入的次数                 Integer count = tuple2.f1;                 //根据State获取中间数据                 Integer historyVal = valueState.value();

                    //根据State中间数据,进行累加                 if (historyVal != null) {                     historyVal += count;                     //累加后,更新State数据                     valueState.update(historyVal);                 } else {                     valueState.update(count);                 }                 return Tuple2.of(word,valueState.value());             }         });

            /** 使用 KeyedState 通过中间状态求和  ----- end ----**/

            //Transformation 结束

            //3.调用Sink (Sink必须调用)         summed.print();

            //启动(这个异常不建议try...catch... 捕获,因为它会抛给上层flink,flink根据异常来做相应的重启策略等处理)         env.execute("KeyedStateDemo");     } }  

    Processed: 0.015, SQL: 9