推荐一篇博文,很好的介绍了Stream的原理.本文对其进行一些补充更加详细的讲解.
作者: 李豪
地址: https://github.com/CarpenterLee/JavaLambdaInternals/blob/master/6-Stream%20Pipelines.md
需求:
从"张三","李四","王二","张四五"中选出以张开头的名字,然后从再从中选出名字最长的一个,输出其长度.
1.一种直白的实现

缺点:
- 迭代次数过多
- 频繁产生中间结果,性能无法接受
实际想要的效果:
平常的写法:
| 12
 3
 4
 5
 6
 7
 8
 
 | int longest = 0;for(String str : strings){
 if(str.startsWith("张")){
 int len = str.length();
 longest = Math.max(len, longest);
 }
 }
 System.out.println(longest);
 
 | 
Stream的做法:
| 12
 3
 4
 5
 
 | Stream.of("张三","李四","王二","张四五").filter(x -> x.startsWith("张"))
 .mapToInt(String::length)
 .max()
 .ifPresent(System.out::println);
 
 | 
2.Stream是怎么做到的?
Stream的操作分类:
中间操作:返回一个新的Stream
        - 有状态 sorted(),必须等上一步操作完拿到全部元素后才可操作
        - 无状态 filter(),该操作的元素不受上一步操作的影响
| 12
 
 | list.stream().filter(x -> x.startWith("张").map(x -> x.length())list.stream().filter(x -> x.startWith("张").sorted().map(x -> x.length())
 
 | 
终端操作:返回结果
        - 短路操作findFirst(),找到一个则返回,也就是break当前的循环
        - 非短路操作forEach(),遍历全部元素
以上操作决定了Stream一定是先构建完毕再执行的特点,也就是延迟执行,当需要结果(终端操作时)开始执行流水线.
Stream做到的是对于多次调用合并到一次迭代中处理完所有的调用方式.换句话说就是解决了上述的两个缺点.大概思路是记录下每一步的操作,然后终端操作时对其迭代依次执行每一步的操作,最后再一次循环中处理.
问题:
- 操作是如何记录下来的?
- 操作是如何叠加的?
- 叠加完如何执行的?
- 执行完如何收集结果的?
Stream结构示意图:

示例代码:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 
 | List<String> data = new ArrayList<>();data.add("张三");
 data.add("李四");
 data.add("王三");
 data.add("马六");
 
 data.stream()
 .filter(x -> x.length() == 2)
 .map(x -> x.replace("三","五"))
 .sorted()
 .filter(x -> x.contains("五"))
 .forEach(System.out::println);
 
 | 
1. 操作是如何记录下来的?
- Head记录Stream起始操作
- StatelessOp记录中间操作
- StatefulOp记录有状态的中间操作
 这三个操作实例化会指向其父类AbstractPipeline,也就是在AbstractPipeline中建立了双向链表
对于Head
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 
 | AbstractPipeline(Spliterator<?> source,int sourceFlags, boolean parallel) {
 this.previousStage = null;
 this.sourceSpliterator = source;
 this.sourceStage = this;
 this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
 this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
 this.depth = 0;
 this.parallel = parallel;
 }
 
 | 
对于其他Stage:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 
 | AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {if (previousStage.linkedOrConsumed)
 throw new IllegalStateException(MSG_STREAM_LINKED);
 previousStage.linkedOrConsumed = true;
 
 previousStage.nextStage = this;
 this.previousStage = previousStage;
 this.sourceStage = previousStage.sourceStage;
 this.depth = previousStage.depth + 1;
 
 this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
 this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
 if (opIsStateful())
 sourceStage.sourceAnyStateful = true;
 }
 
 | 

调用过程如此用双向链表串联起来,每一步都得知其上一步与下一步的操作.
 data.stream()
 .filter(x -> x.length() == 2)
 .map(x -> x.replace("三","五"))
 .sorted()
 .filter(x -> x.contains("五"))
 .forEach(System.out::println);
2.操作是如何叠加的?
Sink<T>接口:
- void begin(long size),循环开始前调用,通知每个Stage做好准备
- void end(),循环结束时调用,依次调用每个Stage的end方法,处理结果
- boolean cancellationRequested(),判断是否可以提前结束循环
- void accept(T value),每一步的处理
其子类之一ChainedReference:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 
 | static abstract class ChainedReference<T, E_OUT> implements Sink<T> {protected final Sink<? super E_OUT> downstream;
 
 public ChainedReference(Sink<? super E_OUT> downstream) {
 this.downstream = Objects.requireNonNull(downstream);
 }
 @Override
 public void begin(long size) {
 downstream.begin(size);
 }
 @Override
 public void end() {
 downstream.end();
 }
 @Override
 public boolean cancellationRequested() {
 return downstream.cancellationRequested();
 }
 }
 
 | 
例Filter:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 
 | @Overridepublic final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
 Objects.requireNonNull(predicate);
 return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
 StreamOpFlag.NOT_SIZED) {
 @Override
 Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
 return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
 @Override
 public void begin(long size) {
 downstream.begin(-1);
 }
 
 @Override
 public void accept(P_OUT u) {
 
 
 if (predicate.test(u))
 downstream.accept(u);
 }
 };
 }
 };
 }
 
 | 
再例如sorted():
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 
 | @Overridepublic void begin(long size) {
 if (size >= Nodes.MAX_ARRAY_SIZE)
 throw new IllegalArgumentException(Nodes.BAD_SIZE);
 list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
 }
 @Override
 public void end() {
 list.sort(comparator);
 downstream.begin(list.size());
 if (!cancellationWasRequested) {
 list.forEach(downstream::accept);
 }
 else {
 for (T t : list) {
 if (downstream.cancellationRequested()) break;
 downstream.accept(t);
 }
 }
 downstream.end();
 list = null;
 }
 @Override
 public void accept(T t) {
 list.add(t);
 }
 
 | 

叠加后如何执行?
执行操作是由终端操作来触发的,例如foreach操作
| 12
 3
 4
 5
 
 | @Overridepublic void forEach(Consumer<? super P_OUT> action) {
 
 evaluate(ForEachOps.makeRef(action, false));
 }
 
 | 
执行前会对操作从末尾到起始反向包裹起来,得到调用链
| 1
 | Sink opWrapSink(int flags, Sink<P_OUT> sink) ;
 | 
| 12
 3
 4
 5
 6
 7
 8
 9
 
 | final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
 Objects.requireNonNull(sink);
 
 for ( AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
 sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
 }
 return (Sink<P_IN>) sink;
 }
 
 | 

| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 
 | @Overridefinal <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
 Objects.requireNonNull(wrappedSink);
 
 if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
 
 wrappedSink.begin(spliterator.getExactSizeIfKnown());
 spliterator.forEachRemaining(wrappedSink);
 wrappedSink.end();
 }
 else {
 copyIntoWithCancel(wrappedSink, spliterator);
 }
 }
 
 | 
有状态的中间操作何时执行?
例如sorted()操作,其依赖上一次操作的结果集,按照调用链来说结果集必须在accept()调用完才会产生.那也就说明sorted操作需要在end中,然后再重新开启调用链.
sorted的end方法:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 
 | @Overridepublic void end() {
 list.sort(comparator);
 downstream.begin(list.size());
 if (!cancellationWasRequested) {
 list.forEach(downstream::accept);
 }
 else {
 for (T t : list) {
 if (downstream.cancellationRequested()) break;
 downstream.accept(t);
 }
 }
 downstream.end();
 list = null;
 }
 
 | 
那么就相当于sorted给原有操作断路了一次,然后又重新接上,再次遍历.

如何收集到结果?
foreach是不需要收集到结果的,但是对于collect这样的操作是需要拿到最终end产生的结果.end产生的结果在最后一个Sink中,这样的操作最终都会提供一个取出数据的get方法.
| 12
 3
 4
 5
 
 | @Overridepublic <P_IN> R evaluateSequential(PipelineHelper<T> helper,
 Spliterator<P_IN> spliterator) {
 return helper.wrapAndCopyInto(makeSink(), spliterator).get();
 }
 
 | 
如此拿到数据返回
个人博客 mrdear.cn ,欢迎交流