学东西的时候最好是理论先行,为什么?没有理论,想当然的去干,干好了是 瞎猫碰上死耗子,干不好就瞎干,浪费时间,只会弄得身心俱疲。
可是在真正的工作中,很少工作会允许你先弄清原理再去实操。但是不管怎么说,欠下的债终究是需要还的。
今天咱们的主题是  stream . 咱们就从 Stream  的 "道,术,法,器" 四个阶段来聊好好的聊聊这个  Stream .
# 以 "器" 始:从使用开始
你平时是怎么使用
Stream的?
比如我会使用 Stream  创建一个流。
| 1 | Stream<Integer> integerStream = Stream.of(1, 2, 3); | 
或者把一种集合类型转成 stream ,然后做一些聚合操作
| 1 | List<Integer> collect = list.stream() | 
那在 jdk1.7  及以前的时候,我们是怎么处理的呢?
| 1 | // 遍历list,所有元素+5 | 
根据上面的对比,我们很明显的就能对比出来:
 stream  的编码方式,使代码更加简洁,可读性也比较强。而且 Stream  提供了集合的常用操作,比如 sort , 过滤 , 去重 , 计数 , limit , skip  等等,直接可以用,可以大大的提高开发效率。
那  Stream  为我们提供了多少功能呢?
从全局来看,所有和  stream  相关的类,都在  java.lang.stream  这包下。
这个包下有很多的类。总体来说,
流处理相关的操作分为两类:
- 中间操作 ( Intermediate Operations)- 无状态的中间操作 ( Stateless): 使用StatelessOp表示。每个操作都是互不影响,不依赖的。这类的操作有:filter()、flatMap()、flatMapToDouble()、flatMapToInt()、flatMapToLong()、map()、mapToDouble()、mapToInt()、mapToLong()、peek()、unordered()等
- 有状态操作( Stateful):使用StatefulOp表示。处理时会记录状态,比如处理了几个。后面元素的处理会依赖前面记录的状态,或者拿到所有元素才能继续下去。如distinct()、sorted()、sorted(comparator)、limit()、skip()等
 
- 无状态的中间操作 ( 
- 终止操作 ( Terminal Operations):使用TerminalOp表示。- 非短路操作:处理完所有数据才能得到结果。如 collect()、count()、forEach()、forEachOrdered()、max()、min()、reduce()、toArray()等。
- 短路( short-circuiting)操作:拿到符合预期的结果就会停下来,不一定会处理完所有数据。如anyMatch()、allMatch()、noneMatch()、findFirst()、findAny()等。
 
- 非短路操作:处理完所有数据才能得到结果。如 
在深入探讨 stream  之前,我们需要储备些知识点。
- 
函数式接口 FunctionInterfaceJDK提供了很多的函数式接口,包路径是:java.util.function. 函数式接口的作用是 Java8 对一类特定类型接口的称呼。这类接口只有一个抽象方法,并且使用@FunctionInterface注解进行注明。在Java Lambda的实现中, 开发组不想再为Lambda表达式单独定义一种特殊的Structural函数类型,称之为箭头类型(arrow type), 依然想采用 Java 既有的类型系统 (class,interface,method等), 原因是增加一个结构化的函数类型会增加函数类型的复杂性,破坏既有的Java类型,并对成千上万的Java类库造成严重的影响。 权衡利弊, 因此最终还是利用SAM接口 (Single Abstract Method) 作为Lambda表达式的目标类型。函数式接口其实在 Jdk8之前就已存在了,比如java.lang.Runnable,java.util.concurrent.Callable,java.util.Comparator等等。只是没有使用@FunctionInterface注解而已。在JDK1.8之后加上了这个注解,并且在java.util.function包下新增很多个函数式接口。 其中,我们需要知道的只有六个:- Predicate: 传入一个参数,返回一个- bool结果, 方法为- boolean test(T t)
- Consumer: 传入一个参数,无返回值,纯消费。 方法为- void accept(T t)
- Function<T,R>: 传入一个参数,返回一个结果,方法为- R apply(T t)
- Supplier: 无参数传入,返回一个结果,方法为- T get()
- UnaryOperator: 一元操作符, 继承- Function<T,T>, 传入参数的类型和返回类型相同。
- BinaryOperator: 二元操作符, 传入的两个参数的类型和返回类型相同, 继承- BiFunction<T,T,T>
 
为什么要了解这个 函数式接口呢?
因为 在 Stream  的方法中,大部分的参数都是使用 函数式接口 接受参数的。所以,如果要探究其实现原理和设计原则的话,这个是必须要知道的。
注意:
lambda表达式,是一种语法的表现形式,使代码表现更加整洁lambda和stream是两个不相关的概念。
# 查 "术" 理: (查看源码,明晰基本的类结构)
先来看下 和  Stream  直接相关的类。

Stream  接口继承了 BaseStream  接口.
✔️ BaseStream 接口表示流的基本接口,而流是支持顺序和并行聚合操作的元素序列。
 Stream  接口有很多实现类。其主要的一个实现类是  ReferencePipeline  类。除此之外 ReferencePipeline  类还继承了 AbstractPipeline  抽象类. ✔️ AbstractPipeline 表示 “管道” 类的抽象基类,它们是 Stream 接口及其原始特化的核心实现。再看 AbstractPipeline  类的父类 PipelineHelper ,✔️ AbstractPipeline 的作用是:用于执行流管道的辅助类,将有关流管道的所有信息(输出形状、中间操作、流标志、并行度等)集中在一个地方。
ReferencePipeline  类有三个子类:  StatefulOp  表示有状态的操作, StatelessOp  表示无状态的操作,  Head  表示  ReferencePipeline  的起始阶段。 当然了,这三个子类也是 流。
# 从创建流开始
不管是使用  Stream.of(T t)  还是  Collection.stream() ,还是 Arrays.stream() , 底层的实现都是通过  StreamSupport.stream()  来实现的。

✔️ StreamSupport 类的作用是:用于创建和操作流的底层实用方法。

可以看到 直接返回的是  ReferencePipeline.Head  对象。 首先  Head  是一种 Stream  的实现。 接着去看  Head  的构造方法,可以看到其实调用的是: AbstractPipeline  的构造方法.
# 流的中间操作
文中已经谈及了 中间操作分为有状态的中间操作和无状态的中间操作。那我们以一个案例来说明操作与操作之间执行的。
| 1 | List<Integer> numbers = Stream.of(1, 2, 3, 4) | 
Stream.of()  方法上文已经简单的说明了,接下来我们来看  map()  方法。

可以看到, map()  返回了一个 StatelessOp  对象,并且重写了 AbstractPipeline  的 opWrapSink  方法。 之前也说过:它表示流的无状态中间阶段的基类。 还有一个 Sink  类型.  Sink  类表示  Consumer  接口的扩展,用于在流管道的各个阶段传递值,以及管理大小信息、控制流等的附加方法。
我们再仔细看一下这个方法。首先这个方法并没有进行任何的计算,只是将  item -> item + 5  这个操作进行三层的封装, 1. 将 map  方法的返回值重新封装成了流对象,2. 把我们的 item -> item + 5  这个操作封装成了  StatelessOp , 并重写了 opWrapSink  这个方法,并在终止操作时进行调用。 3. 使用 sink ( Sink.ChainedReference)  将管道的各个阶段连接起来。即赋值 downStream . 使用 downstream  这个 Consumer  完成 accept  调用。
这里需要注意一下:  StatelessOp  类的构造方法的实体参传输了一个  this  字段。仔细翻看源码就会返现它一直调用到  AbstractPipeline  的构造方法中。

可以看到  AbstractPipeline  中有两个字段  nextStage  和  previousStage  字段,分别表示的是上一阶段和下一阶段。其中  nextStage  是 当前阶段。  previousStage  则应该 当前阶段的上一个阶段,其实就是调用当前方法的对象。
不知道你是否发现 通过这种方法, stream  组成了一个 流各个阶段的双向链表。节点就是流操作的各个阶段。
ps: 这样一次流操作会创建两个链表:  Stream  阶段的双向链表,和 在终止操作时,根据双向链表生成的  Sink  链表。
再次说明:到目前为止, map()  方法里只是进行了封装,没有进行任何计算!
接着来看  sorted()  方法。

sorted  方法比较简单,通过调用 SortedOps  类的 makeRef  方法,创建了 OfRef  对象。  OfRef  类的作用是:用于对流进行排序的专用子类型。  OfRef  类继承了  ReferencePipeline.StatefulOp  ,所以 OfRef  是一个有状态操作。那自然它也会有  opWrapSink  方法。也就是说它也会返回一个 Sink 对象,只是这个 Sink  对象的实现类不一样的。
说明:到目前为止,
sorted()方法里只是进行了封装,没有进行任何计算!
同理去看 limit  方法。

这个方法的内部是直接创建了一个  ReferencePipeline.StatefulOp  对象,也是重写了其中的方法:  opWrapSink .
不知道你是否有好奇,我为什么每次都会提到  opWrapSink  这个方法呢?因为这个方法非常的重要!其重要性我们在 探” 法 “择 这部分会完整的说明。
再三说明:到目前为止,
limit()方法里只是进行了封装,没有进行任何计算!
书行至此,案例中的中间操作都已经简单的分析完成了。我们就知道这里  jdk  为了完成 流操作为每个中间操作都封装了很多的对象,而这些对象只是散列在了内存中。接下来,就要看  jdk  是如何把他们组装到一起的。
# 终止操作
以 Collect  方法为例,去探究一下终止操作的流程。

可以看到在 collect  方法中,分为并行执行方式和串行执行方法,我们看串行执行时,会创建  ReduceOps  终止操作对象。

将 终止操作 传递给 evaluate 方法,然后调用终止操作的 evaluate 方法,当然这个方法也分成了串行执行和并行执行两种。

helper  其实是  limit(3)  中间操作返回的对象。这其实中间操作的最后一个 Stage  (阶段)。返回的对象是 AbstractPipeline  和  Stream  的子类实例。

这里包含两个方法:  wrapSink()  和  copyInfo() .
这是两个非常重要的方法.  wrapSink()  是将中间的操作组成  SinkChain  。  copyInfo()  这是执行真正的计算逻辑。

方法中的形参  sink  就是最后的阶段的终止操作。方法通过循环将  sink  分装到 Sink  中。  Sink  接口 的一个实现类是  ChainedReference  , 类中定义了一个  downStream  字段。 会将 sink = p.opWrapSink(p.previousStage.combinedFlags, sink);   中的  sink  赋值给  downStream . 这样就形成了 套娃。 最后返回一个  wrapSink  , 即整个流操作中所有的操作的 封装 Sink .

图中所示的即为上面提及的 封装 Sink . 可中断和不可中断的区别是:可中断如果获取值,就不必再取所有的结果了。反之,就需要计算出所有阶段的结果。
非可中断的终止操作时,会执行  begin() , forEachRemaining() , end() , 三个方法。  这个三个方法对应的是: Sink  接口中提供的三个方法。
| 1 | // 每个Sink开始之前调用该方法,通知sink做好准备 | 
其中,
- begin()方法,会调用每个- Sink子类的- begin方法。
- forEachRemaining()方法对应的执行内容如下图:
 ![]() 
- end()方法,会调用每个- Sink字段的- end方法。
书行至此。或许你会对  forEachRemaining  方法感到好奇。后面我会写一篇文章来专门分享: 《 Stream  的高级迭代器》, 希望你能继续关注支持我~
# 探” 法 “择
我们从一个案例出发,在细节之处分析了一个 Stream  的执行过程。现在我们需要从全局来看一下  Stream  的执行过程是什么样子的.
上文中我们知道了  Stream  的 所有计算都是在 终止操作时 触发的。 所有的中间操作都是封装了一些对象。我们用一张图来描述下 Stream  的执行过程。

- stream将创建的流做为第一个- Stage, 用来代表流的开始, 每个- Stage都是- AbstractPipeline的子类。 第一个- Stage是- AbstractPipeline.Head对象。
- 然后将中间操作封装成后面的 n 个  stage. 并组成 双向链表的形式,并且存储了stage0. 每个Stage都是StatelessOp或者statefulOp.
- 终止操作通过 wrapSink()方法 会触发将 每个阶段的操作封装成Sink. 并且sink都会做为参数传递到上一个阶段的opWrapSink()方法中,从而组成一个sink链表。
- 然后,通过  copyInfo()方法将,交于Spilterator进行迭代。计算的结果可以分为四种- 返回 boolean类型的结果:比如anyMatch()allMatch()noneMatch()方法。
- 返回 Optional类型的结果: 比如findFirst()findAny()方法
- 还有归约操作:	 reduce()collect()
- 返回数组的: toArray()
 对于表中返回boolean或者Optional的操作(Optional是存放 一个 值的容器)的操作,由于值返回一个值,只需要在对应的Sink中记录这个值,等到执行结束时返回就可以了。
 对于归约操作,最终结果放在用户调用时指定的容器中(容器类型通过收集器指定)。collect(),reduce(),max(),min()都是归约操作,虽然max()和min()也是返回一个Optional,但事实上底层是通过调用reduce()方法实现的。
 对于返回是数组的情况,毫无疑问的结果会放在数组当中。这么说当然是对的,但在最终返回数组之前,结果其实是存储在一种叫做Node的数据结构中的。Node是一种多叉树结构,元素存储在树的叶子当中,并且一个叶子节点可以存放多个元素。这样做是为了并行执行方便。关于Stream的并行计算,我后面会继续分享。
 
- 返回 
# 明 "道" 义
JDK  提供的  Stream  具有如下特点:
- 无存储。 stream不是一种数据结构,它只是某种数据源的一个视图,数据源可以是一个数组,Java容器或I/O channel等。
- 为函数式编程而生。对 stream的任何修改都不会修改背后的数据源,比如对stream执行过滤操作并不会删除被过滤的元素,而是会产生一个不包含被过滤元素的新stream。
- 惰式执行。 stream上的操作并不会立即执行,只有等到用户真正需要结果的时候才会执行。
- 可消费性。 stream只能被 “消费” 一次,一旦遍历过就会失效,就像容器的迭代器那样,想要再次遍历必须重新生成。
在这一趴,我就围绕两个点来简单的聊聊。
- JDK8为什么要加入- Stream.
除了上面四个特点之外, Java8  中的 Stream  是对集合对象的增强,当然不仅仅是集合对象。 Stream  为开发者提供了简洁的编码方式和编码风格,极大的提高了开发的效率。
另外一个更重要的点在于  Stream  为我们下篇文章要分享的  Stream  并行计算流 提供了实现,请期待。
- Stream为什么要这么设计?
我这里给一份我的回答,这个问题也留给看文章的你,也希望能看到你的回答。
根据上文所说的内容, Stream  体系是一组接口家族, AbstractPipeline  是接口的实现, PipelineHelper  是管道的辅助类, StreamSupport  是流的底层工具类
Stream  使用 stage  来抽象流水线上的每个操作,其实每个 stage  就是一个 stream  子类的实例, 也就是 AbstractPipeline  几个子类的内部子类即 Head   StatelessOp   statefulOp ;
StreamSupport  用于创建生成 Stream  对应的是 Head  类,其他的中间操作分为有状态和无状态的,中间操作通过方法比如  filter   map  等返回的是 StatelessOp  或者  statefulOp .  多个 stage  组合称为双向链表的形式 从而成了整个流水线
有了流水线,相邻两个操作阶段之间如何协调运算?
于是又有了 Sink  的概念,又来协调相邻的 stage  之间计算运行
他的模式是 begin    accept   end  还有短路标记
他的 accept  就是封装了回调方法,所以说每个操作 stage ,  StatelessOp   或者  statefulOp  中又封装了 Sink . 通过 AbstractPipeline  提供的 opWrapSink  方法可以获取这个 Sink
调用这个 sink  的 accept  方法就可以调用当前操作的方法
那么如何串联起来呢?
关键点在于 opWrapSink  方法,他接收一个 Sink  作为参数,在调用 accept  方法中。可以调用这个入参 Sink  的 accept  方法
这样子从当前就能调用下一个,也就是说有了推动的动作。那么只需要找到开始,每个处理了之后都推动下一个,就顺序完成了所欲的操作了。
# 结语
通过看  Stream  相关的知识点,发现一篇文章是没法讲清楚的。
这一次,我又果不其然的留下了两篇文章
请给我记代办~
在分享 并行计算流 的时候,我们需要以  JDK1.7  中的  forkJoin  框架为前提,来分析  Stream  的  parallelStream .
在分享 迭代器 的时候,我们也会分析一下 JDK  中提供的 普通迭代器,比如  ForEach ,  iterator , 以及 Stream  的高级迭代器  spliterator . 也会由浅入深的分析一下,各种迭代器的优缺点。 也会自定义实现一个迭代器。
敬请期待,防止走丢见文末。关注我,期望和你一起遇见更好的自己.
# 最后
期望和你一起遇见更好的自己


 
          