java8的Stream流水线,用起来很爽,但是他是怎么做到的呢。
Stream流水线记录用户的每一步操作步骤(map,filter等),当用户调用结束操作(Collect,reduce 等)时将用户之前记录的操作一并执行。这里就有几个问题要解决了
如何记录用户操作如何将用户操作串联起来如何触发整个任务获取结果首先查看类图
查看源码我们可以看到 list.stream().map(x->x+“123”).filter(x->x.startsWith(“test”)).collect(Collectors.toList());
这样的流水线实际上每次都创建一个Stream的实现类并且返回,并且新创建的实现类持有上一个的引用。
中间操作分类
unordered() filter() map() mapToInt() mapToLong() mapToDouble() flatMap() flatMapToInt() flatMapToLong() flatMapToDouble() peek()等操作时无状态的 创建的是 StatelessOp对象distinct() sorted() sorted() limit() skip() 创建的是 StateFullOp对象以map为例看如何记录用户串联操作:
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) { Objects.requireNonNull(mapper); return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { return new Sink.ChainedReference<P_OUT, R>(sink) { @Override public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); } }; } }; }这里创建了一个 StatelessOp 对象 并返回,看构造函数的第一个参数 新创建的Stream对象,持有了原来对象的引用,这样就以链表的形势记录了用户操作。
sink
这里复写了opWrapSink方法,返回值为sink,sink接口是串联用户操作的关键:
sink 接口 主要有四个方法
begin 开始遍历元素之前回调end 元素遍历后回调cancellationRequested 是否可以结束操作,可以让短路操作尽早结束,比如findAny这种就不用遍历所有的元素。accept 接受一个元素,并对一个元进行处理 return new Sink.ChainedReference<P_OUT, R>(sink) { @Override public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); } };这里复写了 accept方法,对传入的元素 调用了用户传入的方法(mapper.apply(u)) ,并把其返回结果交个下游的sink 处理。这样就把用户的操作串联起来了。
看一个复杂的sink实现,
private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> { private ArrayList<T> list; RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) { super(sink, comparator); } @Override public void begin(long size) { if (size >= Nodes.MAX_ARRAY_SIZE) throw new IllegalArgumentException(Nodes.BAD_SIZE); list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>(); //初始化一个list,用来存放元素 } @Override public void end() { //拿到所有元素之后开始排序 list.sort(comparator); downstream.begin(list.size()); //区分下游是否包含短路操作 if (!cancellationWasRequested) { //将排序好的元素发送到下游sink list.forEach(downstream::accept); } else { for (T t : list) { if (downstream.cancellationRequested()) break; downstream.accept(t); } } downstream.end(); list = null; } @Override public void accept(T t) { //接受一个元素,放入list list.add(t); } }终端操作最终会调用下面这个函数
@Override @SuppressWarnings("unchecked") //传入TerminalSink(最后一个sink收集结果) final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) { Objects.requireNonNull(sink); //从后往前调用(前面说了当前Stream对象持有前一个Stream的引用)上面说的 opWrapSink 方法 得到 串联之后的 sink for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink<P_IN>) sink; }最终对每个元素触发上面的到sink
@Override final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } else { copyIntoWithCancel(wrappedSink, spliterator); } }TerminalSink 有以下3种
FindSink findAny optional 等操作 ,在对应的Sink中记录这个值,等到执行结束时返回。AccumulatingSink collect reduce等 归约操作,在sink 中 收集元素放入相应容器中。ForEachOp forEach ,在sink 中对元素回调 传入的 consumer 方法。stream.parallel()方法可以将流 转换会并行流,多个线程一起处理该流实现方式就是
java.util.Spliterator#trySplit() 方法,将流分成一个个子流,在不同的线程中执行。