Flink DataSetpartitionByRange sortPartition用法 实例

    技术2025-04-30  23

    package DataSetPartitionTest1; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; /** * @Author you guess * @Date 2020/7/4 11:58 * @Version 1.0 * @Desc */ public class DataSetPartitionTest1 { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple2<Long, String>> source2 = env.fromElements( Tuple2.of(2L, "xiaoli"), Tuple2.of(1L, "shinelon"), Tuple2.of(67L, "hhhhhh67"), Tuple2.of(2L, "xiaohong"), Tuple2.of(3L, "hhhhhh"), Tuple2.of(13L, "hhhhhh13"), Tuple2.of(51L, "hhhhhh51"), Tuple2.of(89L, "hhhhhh89"), Tuple2.of(23L, "hhhhhh23"), Tuple2.of(49L, "hhhhhh49"), Tuple2.of(3L, "hhhhhh37")); System.out.println("-----111111111111--------"); //partitionByRange作用是把key相同的元素放入同一个分区(由同一个线程处理) //注意:是相同的key,不是hash值相同的key source2.partitionByRange(0).sortPartition(0, Order.DESCENDING).map(new MapFunction<Tuple2<Long, String>, Object>() { @Override public Object map(Tuple2<Long, String> value) throws Exception { System.out.println("线程id#" + Thread.currentThread().getId() + "," + value.f0); return new Tuple2<Long, String>(value.f0, value.f1); } }).print(); System.out.println("-----222222222--------"); //sortPartition默认由一个线程处理,是全局有效,保证整个数据集有序 source2.sortPartition(0, Order.DESCENDING).map(new MapFunction<Tuple2<Long, String>, Object>() { @Override public Object map(Tuple2<Long, String> value) throws Exception { System.out.println("线程id#" + Thread.currentThread().getId()); return new Tuple2<Long, String>(value.f0, value.f1); } }).print(); System.out.println("---3333333333---"); //先按第一个字段排升序,再按第二个字段排升序,= order by c1,c2 ;默认asc; source2.sortPartition(0, Order.ASCENDING).sortPartition(1, Order.ASCENDING).print(); System.out.println("---4444444---"); //设置并行度是2,则是每个并行度内有序,并不是整个数据集有序 source2.sortPartition(0, Order.DESCENDING).setParallelism(2).map(new MapFunction<Tuple2<Long, String>, Object>() { @Override public Object map(Tuple2<Long, String> value) throws Exception { System.out.println("线程id#" + Thread.currentThread().getId() + "," + value.f0); return new Tuple2<Long, String>(value.f0, value.f1); } }).print(); } } /* -----111111111111-------- 线程id#97,2 线程id#98,3 线程id#100,13 线程id#98,3 线程id#97,2 线程id#101,23 线程id#102,49 线程id#103,51 线程id#104,67 线程id#105,89 线程id#95,1 (1,shinelon) (2,xiaoli) (2,xiaohong) (3,hhhhhh) (3,hhhhhh37) (13,hhhhhh13) (23,hhhhhh23) (49,hhhhhh49) (51,hhhhhh51) (67,hhhhhh67) (89,hhhhhh89) -----222222222-------- 线程id#238 线程id#238 线程id#238 线程id#238 线程id#238 线程id#238 线程id#238 线程id#238 线程id#238 线程id#238 线程id#238 (89,hhhhhh89) (67,hhhhhh67) (51,hhhhhh51) (49,hhhhhh49) (23,hhhhhh23) (13,hhhhhh13) (3,hhhhhh) (3,hhhhhh37) (2,xiaoli) (2,xiaohong) (1,shinelon) ---3333333333--- (1,shinelon) (2,xiaohong) (2,xiaoli) (3,hhhhhh) (3,hhhhhh37) (13,hhhhhh13) (23,hhhhhh23) (49,hhhhhh49) (51,hhhhhh51) (67,hhhhhh67) (89,hhhhhh89) ---4444444--- 线程id#403,67 线程id#406,89 线程id#403,51 线程id#403,23 线程id#403,3 线程id#403,3 线程id#403,2 线程id#406,49 线程id#406,13 线程id#406,2 线程id#406,1 (67,hhhhhh67) (51,hhhhhh51) (23,hhhhhh23) (3,hhhhhh) (3,hhhhhh37) (2,xiaoli) (89,hhhhhh89) (49,hhhhhh49) (13,hhhhhh13) (2,xiaohong) (1,shinelon) */

    flink 1.9.2,java1.8 

    sortPartition的默认并行度是1,是全局有序, 或者  sortPartition(0, Order.DESCENDING).setParallelism(1)并行度是1,也是全局有序,

    Flink DataSet first groupBy sortGroup 用法 实例 

    Processed: 0.019, SQL: 9