package DataSetPartitionTest1;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
/**
* @Author you guess
* @Date 2020/7/4 11:58
* @Version 1.0
* @Desc
*/
public class DataSetPartitionTest1 {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Long, String>> source2 = env.fromElements(
Tuple2.of(2L, "xiaoli"),
Tuple2.of(1L, "shinelon"),
Tuple2.of(67L, "hhhhhh67"),
Tuple2.of(2L, "xiaohong"),
Tuple2.of(3L, "hhhhhh"),
Tuple2.of(13L, "hhhhhh13"),
Tuple2.of(51L, "hhhhhh51"),
Tuple2.of(89L, "hhhhhh89"),
Tuple2.of(23L, "hhhhhh23"),
Tuple2.of(49L, "hhhhhh49"),
Tuple2.of(3L, "hhhhhh37"));
System.out.println("-----111111111111--------");
//partitionByRange作用是把key相同的元素放入同一个分区(由同一个线程处理)
//注意:是相同的key,不是hash值相同的key
source2.partitionByRange(0).sortPartition(0, Order.DESCENDING).map(new MapFunction<Tuple2<Long, String>, Object>() {
@Override
public Object map(Tuple2<Long, String> value) throws Exception {
System.out.println("线程id#" + Thread.currentThread().getId() + "," + value.f0);
return new Tuple2<Long, String>(value.f0, value.f1);
}
}).print();
System.out.println("-----222222222--------");
//sortPartition默认由一个线程处理,是全局有效,保证整个数据集有序
source2.sortPartition(0, Order.DESCENDING).map(new MapFunction<Tuple2<Long, String>, Object>() {
@Override
public Object map(Tuple2<Long, String> value) throws Exception {
System.out.println("线程id#" + Thread.currentThread().getId());
return new Tuple2<Long, String>(value.f0, value.f1);
}
}).print();
System.out.println("---3333333333---");
//先按第一个字段排升序,再按第二个字段排升序,= order by c1,c2 ;默认asc;
source2.sortPartition(0, Order.ASCENDING).sortPartition(1, Order.ASCENDING).print();
System.out.println("---4444444---");
//设置并行度是2,则是每个并行度内有序,并不是整个数据集有序
source2.sortPartition(0, Order.DESCENDING).setParallelism(2).map(new MapFunction<Tuple2<Long, String>, Object>() {
@Override
public Object map(Tuple2<Long, String> value) throws Exception {
System.out.println("线程id#" + Thread.currentThread().getId() + "," + value.f0);
return new Tuple2<Long, String>(value.f0, value.f1);
}
}).print();
}
}
/*
-----111111111111--------
线程id#97,2
线程id#98,3
线程id#100,13
线程id#98,3
线程id#97,2
线程id#101,23
线程id#102,49
线程id#103,51
线程id#104,67
线程id#105,89
线程id#95,1
(1,shinelon)
(2,xiaoli)
(2,xiaohong)
(3,hhhhhh)
(3,hhhhhh37)
(13,hhhhhh13)
(23,hhhhhh23)
(49,hhhhhh49)
(51,hhhhhh51)
(67,hhhhhh67)
(89,hhhhhh89)
-----222222222--------
线程id#238
线程id#238
线程id#238
线程id#238
线程id#238
线程id#238
线程id#238
线程id#238
线程id#238
线程id#238
线程id#238
(89,hhhhhh89)
(67,hhhhhh67)
(51,hhhhhh51)
(49,hhhhhh49)
(23,hhhhhh23)
(13,hhhhhh13)
(3,hhhhhh)
(3,hhhhhh37)
(2,xiaoli)
(2,xiaohong)
(1,shinelon)
---3333333333---
(1,shinelon)
(2,xiaohong)
(2,xiaoli)
(3,hhhhhh)
(3,hhhhhh37)
(13,hhhhhh13)
(23,hhhhhh23)
(49,hhhhhh49)
(51,hhhhhh51)
(67,hhhhhh67)
(89,hhhhhh89)
---4444444---
线程id#403,67
线程id#406,89
线程id#403,51
线程id#403,23
线程id#403,3
线程id#403,3
线程id#403,2
线程id#406,49
线程id#406,13
线程id#406,2
线程id#406,1
(67,hhhhhh67)
(51,hhhhhh51)
(23,hhhhhh23)
(3,hhhhhh)
(3,hhhhhh37)
(2,xiaoli)
(89,hhhhhh89)
(49,hhhhhh49)
(13,hhhhhh13)
(2,xiaohong)
(1,shinelon)
*/
flink 1.9.2,java1.8
sortPartition的默认并行度是1,是全局有序,
或者
sortPartition(0, Order.DESCENDING).setParallelism(1)并行度是1,也是全局有序,
Flink DataSet first groupBy sortGroup 用法 实例