下面例子的功能:统计相同名称产品价格总和
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<Produce> ds = env.fromCollection(StudentUtil.getStudentList()); //根据产品名称分组 KeyedStream<Produce, String> ks = ds.keyBy(new KeySelector<Produce, String>() { @Override public String getKey(Produce s) throws Exception { return s.getName(); } }); SingleOutputStreamOperator<Produce> reds = ks.reduce(new ReduceFunction<Produce>() { @Override public Produce reduce(Produce s, Produce t1) throws Exception { //相同名称的产品价格求和(average, sum, min, max, count等功能只需要在这个方法中实现) return new Produce(s.getName(), s.getPrice() + t1.getPrice()); } }).filter(new FilterFunction<Produce>() { //filter 做了个简单的输出 @Override public boolean filter(Produce s) throws Exception { System.out.println(String.format("输出结果 name = %s, price = %d", s.getName(), s.getPrice())); return true; } }); try { env.execute("测试 reduce"); } catch (Exception e) { e.printStackTrace(); }输入数据
输出结果 name = name2, price = 2 输出结果 name = name2, price = 4 输出结果 name = name4, price = 4 输出结果 name = name0, price = 3 输出结果 name = name4, price = 3 输出结果 name = name2, price = 4 输出结果 name = name2, price = 1 输出结果 name = name0, price = 0 输出结果 name = name1, price = 3 输出结果 name = name4, price = 1输出结果
输出结果 name = name2, price = 2 输出结果 name = name2, price = 6 输出结果 name = name4, price = 4 输出结果 name = name0, price = 3 输出结果 name = name4, price = 7 输出结果 name = name2, price = 10 输出结果 name = name2, price = 11 输出结果 name = name0, price = 3 输出结果 name = name1, price = 3 输出结果 name = name4, price = 8DataStream API 提供了多种聚合方式,例如 min,max,sum 等。这些函数可以应用于 KeyedStream 以获得 Aggregations 聚合。
*注意:maxby 返回最大值所在的那条数据,max是将最大值赋予当前数据所对应的属性,返回当前数据,min和minby同理
代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //Tuple3Util.getTuple2List()是获取一个集合 DataStream<Tuple3<String, Integer, String>> ds = env.fromCollection(Tuple3Util.getTuple2List()); ds.keyBy(0).sum(1).filter(new FilterFunction<Tuple3<String, Integer, String>>() { //只用于输出 @Override public boolean filter(Tuple3<String, Integer, String> t) throws Exception { System.out.println(String.format("产品名称 = %s, 产品价格 = %d, 产品备注 = %s", t.f0, t.f1, t.f2)); return true; } }); env.execute("test Aggregations");一、输入数据:
产品名称 = name2, 产品价格 = 0, 产品备注 = 备注 = 0 产品名称 = name1, 产品价格 = 8, 产品备注 = 备注 = 5 产品名称 = name4, 产品价格 = 5, 产品备注 = 备注 = 8 产品名称 = name0, 产品价格 = 3, 产品备注 = 备注 = 9 产品名称 = name2, 产品价格 = 6, 产品备注 = 备注 = 9 产品名称 = name2, 产品价格 = 6, 产品备注 = 备注 = 3 产品名称 = name1, 产品价格 = 0, 产品备注 = 备注 = 1 产品名称 = name3, 产品价格 = 0, 产品备注 = 备注 = 8 产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 5 产品名称 = name1, 产品价格 = 9, 产品备注 = 备注 = 9二、输出结果
产品名称 = name2, 产品价格 = 0, 产品备注 = 备注 = 0 产品名称 = name1, 产品价格 = 8, 产品备注 = 备注 = 5 产品名称 = name4, 产品价格 = 5, 产品备注 = 备注 = 8 产品名称 = name0, 产品价格 = 3, 产品备注 = 备注 = 9 产品名称 = name2, 产品价格 = 6, 产品备注 = 备注 = 9 产品名称 = name2, 产品价格 = 6, 产品备注 = 备注 = 9 产品名称 = name1, 产品价格 = 8, 产品备注 = 备注 = 5 产品名称 = name3, 产品价格 = 0, 产品备注 = 备注 = 8 产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 5 产品名称 = name1, 产品价格 = 9, 产品备注 = 备注 = 9一、输入数据:
产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 0 产品名称 = name0, 产品价格 = 0, 产品备注 = 备注 = 6 产品名称 = name4, 产品价格 = 1, 产品备注 = 备注 = 3 产品名称 = name2, 产品价格 = 6, 产品备注 = 备注 = 6 产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 7 产品名称 = name0, 产品价格 = 6, 产品备注 = 备注 = 9 产品名称 = name1, 产品价格 = 3, 产品备注 = 备注 = 4 产品名称 = name3, 产品价格 = 4, 产品备注 = 备注 = 7 产品名称 = name0, 产品价格 = 0, 产品备注 = 备注 = 1 产品名称 = name3, 产品价格 = 1, 产品备注 = 备注 = 1二、输出结果
产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 0 产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 0 产品名称 = name4, 产品价格 = 1, 产品备注 = 备注 = 3 产品名称 = name2, 产品价格 = 6, 产品备注 = 备注 = 6 产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 0 产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 0 产品名称 = name1, 产品价格 = 3, 产品备注 = 备注 = 4 产品名称 = name3, 产品价格 = 4, 产品备注 = 备注 = 7 产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 0 产品名称 = name3, 产品价格 = 4, 产品备注 = 备注 = 7