flink transformation聚合算子(reduce,max和maxby等)

    技术2022-07-14  76

    1、Reduce

    通过reduce可以实现average, sum, min, max, count 等功能 ,reduce第一个参数以reduce操作结果, 第二个参数是当前元素。reduce每传入一个元素生成一个新的元素

    下面例子的功能:统计相同名称产品价格总和

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<Produce> ds = env.fromCollection(StudentUtil.getStudentList()); //根据产品名称分组 KeyedStream<Produce, String> ks = ds.keyBy(new KeySelector<Produce, String>() { @Override public String getKey(Produce s) throws Exception { return s.getName(); } }); SingleOutputStreamOperator<Produce> reds = ks.reduce(new ReduceFunction<Produce>() { @Override public Produce reduce(Produce s, Produce t1) throws Exception { //相同名称的产品价格求和(average, sum, min, max, count等功能只需要在这个方法中实现) return new Produce(s.getName(), s.getPrice() + t1.getPrice()); } }).filter(new FilterFunction<Produce>() { //filter 做了个简单的输出 @Override public boolean filter(Produce s) throws Exception { System.out.println(String.format("输出结果 name = %s, price = %d", s.getName(), s.getPrice())); return true; } }); try { env.execute("测试 reduce"); } catch (Exception e) { e.printStackTrace(); }

    输入数据

    输出结果 name = name2, price = 2 输出结果 name = name2, price = 4 输出结果 name = name4, price = 4 输出结果 name = name0, price = 3 输出结果 name = name4, price = 3 输出结果 name = name2, price = 4 输出结果 name = name2, price = 1 输出结果 name = name0, price = 0 输出结果 name = name1, price = 3 输出结果 name = name4, price = 1

    输出结果

    输出结果 name = name2, price = 2 输出结果 name = name2, price = 6 输出结果 name = name4, price = 4 输出结果 name = name0, price = 3 输出结果 name = name4, price = 7 输出结果 name = name2, price = 10 输出结果 name = name2, price = 11 输出结果 name = name0, price = 3 输出结果 name = name1, price = 3 输出结果 name = name4, price = 8

    2、Aggregations

    DataStream API 提供了多种聚合方式,例如 min,max,sum 等。这些函数可以应用于 KeyedStream 以获得 Aggregations 聚合。

    *注意:maxby 返回最大值所在的那条数据,max是将最大值赋予当前数据所对应的属性,返回当前数据,min和minby同理

    代码如下:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //Tuple3Util.getTuple2List()是获取一个集合 DataStream<Tuple3<String, Integer, String>> ds = env.fromCollection(Tuple3Util.getTuple2List()); ds.keyBy(0).sum(1).filter(new FilterFunction<Tuple3<String, Integer, String>>() { //只用于输出 @Override public boolean filter(Tuple3<String, Integer, String> t) throws Exception { System.out.println(String.format("产品名称 = %s, 产品价格 = %d, 产品备注 = %s", t.f0, t.f1, t.f2)); return true; } }); env.execute("test Aggregations");

    1、maxby 方式聚合

    一、输入数据:

    产品名称 = name2, 产品价格 = 0, 产品备注 = 备注 = 0 产品名称 = name1, 产品价格 = 8, 产品备注 = 备注 = 5 产品名称 = name4, 产品价格 = 5, 产品备注 = 备注 = 8 产品名称 = name0, 产品价格 = 3, 产品备注 = 备注 = 9 产品名称 = name2, 产品价格 = 6, 产品备注 = 备注 = 9 产品名称 = name2, 产品价格 = 6, 产品备注 = 备注 = 3 产品名称 = name1, 产品价格 = 0, 产品备注 = 备注 = 1 产品名称 = name3, 产品价格 = 0, 产品备注 = 备注 = 8 产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 5 产品名称 = name1, 产品价格 = 9, 产品备注 = 备注 = 9

    二、输出结果

    产品名称 = name2, 产品价格 = 0, 产品备注 = 备注 = 0 产品名称 = name1, 产品价格 = 8, 产品备注 = 备注 = 5 产品名称 = name4, 产品价格 = 5, 产品备注 = 备注 = 8 产品名称 = name0, 产品价格 = 3, 产品备注 = 备注 = 9 产品名称 = name2, 产品价格 = 6, 产品备注 = 备注 = 9 产品名称 = name2, 产品价格 = 6, 产品备注 = 备注 = 9 产品名称 = name1, 产品价格 = 8, 产品备注 = 备注 = 5 产品名称 = name3, 产品价格 = 0, 产品备注 = 备注 = 8 产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 5 产品名称 = name1, 产品价格 = 9, 产品备注 = 备注 = 9

    2、max方式

    一、输入数据:

    产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 0 产品名称 = name0, 产品价格 = 0, 产品备注 = 备注 = 6 产品名称 = name4, 产品价格 = 1, 产品备注 = 备注 = 3 产品名称 = name2, 产品价格 = 6, 产品备注 = 备注 = 6 产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 7 产品名称 = name0, 产品价格 = 6, 产品备注 = 备注 = 9 产品名称 = name1, 产品价格 = 3, 产品备注 = 备注 = 4 产品名称 = name3, 产品价格 = 4, 产品备注 = 备注 = 7 产品名称 = name0, 产品价格 = 0, 产品备注 = 备注 = 1 产品名称 = name3, 产品价格 = 1, 产品备注 = 备注 = 1

    二、输出结果

    产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 0 产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 0 产品名称 = name4, 产品价格 = 1, 产品备注 = 备注 = 3 产品名称 = name2, 产品价格 = 6, 产品备注 = 备注 = 6 产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 0 产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 0 产品名称 = name1, 产品价格 = 3, 产品备注 = 备注 = 4 产品名称 = name3, 产品价格 = 4, 产品备注 = 备注 = 7 产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 0 产品名称 = name3, 产品价格 = 4, 产品备注 = 备注 = 7

    Processed: 0.015, SQL: 9