学东西的时候最好是理论先行,为什么?没有理论,想当然的去干,干好了是 瞎猫碰上死耗子,干不好就瞎干,浪费时间,只会弄得身心俱疲。
可是在真正的工作中,很少工作会允许你先弄清原理再去实操。但是不管怎么说,欠下的债终究是需要还的。
今天咱们的主题是 stream
. 咱们就从 Stream
的 "道,术,法,器" 四个阶段来聊好好的聊聊这个 Stream
.
# 以 "器" 始:从使用开始
你平时是怎么使用
Stream
的?
比如我会使用 Stream
创建一个流。
1 | Stream<Integer> integerStream = Stream.of(1, 2, 3); |
或者把一种集合类型转成 stream
,然后做一些聚合操作
1 | List<Integer> collect = list.stream() |
那在 jdk1.7
及以前的时候,我们是怎么处理的呢?
1 | // 遍历list,所有元素+5 |
根据上面的对比,我们很明显的就能对比出来:
stream
的编码方式,使代码更加简洁,可读性也比较强。而且 Stream
提供了集合的常用操作,比如 sort
, 过滤
, 去重
, 计数
, limit
, skip
等等,直接可以用,可以大大的提高开发效率。
那 Stream
为我们提供了多少功能呢?
从全局来看,所有和 stream
相关的类,都在 java.lang.stream
这包下。
这个包下有很多的类。总体来说,
流处理相关的操作分为两类:
- 中间操作 (
Intermediate Operations
)- 无状态的中间操作 (
Stateless
): 使用StatelessOp
表示。每个操作都是互不影响,不依赖的。这类的操作有:filter()
、flatMap()
、flatMapToDouble()
、flatMapToInt()
、flatMapToLong()
、map()
、mapToDouble()
、mapToInt()
、mapToLong()
、peek()
、unordered()
等 - 有状态操作(
Stateful
):使用StatefulOp
表示。处理时会记录状态,比如处理了几个。后面元素的处理会依赖前面记录的状态,或者拿到所有元素才能继续下去。如distinct()
、sorted()
、sorted(comparator)
、limit()
、skip()
等
- 无状态的中间操作 (
- 终止操作 (
Terminal Operations
):使用TerminalOp
表示。- 非短路操作:处理完所有数据才能得到结果。如
collect()
、count()
、forEach()
、forEachOrdered()
、max()
、min()
、reduce()
、toArray()
等。 - 短路(
short-circuiting
)操作:拿到符合预期的结果就会停下来,不一定会处理完所有数据。如anyMatch()
、allMatch()
、noneMatch()
、findFirst()
、findAny()
等。
- 非短路操作:处理完所有数据才能得到结果。如
在深入探讨 stream
之前,我们需要储备些知识点。
-
函数式接口
FunctionInterface
JDK
提供了很多的函数式接口,包路径是:java.util.function
. 函数式接口的作用是 Java8 对一类特定类型接口的称呼。这类接口只有一个抽象方法,并且使用@FunctionInterface
注解进行注明。在Java Lambda
的实现中, 开发组不想再为Lambda
表达式单独定义一种特殊的Structural
函数类型,称之为箭头类型(arrow type
), 依然想采用 Java 既有的类型系统 (class
,interface
,method
等), 原因是增加一个结构化的函数类型会增加函数类型的复杂性,破坏既有的Java
类型,并对成千上万的Java
类库造成严重的影响。 权衡利弊, 因此最终还是利用SAM
接口 (Single Abstract Method
) 作为Lambda
表达式的目标类型。函数式接口其实在
Jdk8
之前就已存在了,比如java.lang.Runnable
,java.util.concurrent.Callable
,java.util.Comparator
等等。只是没有使用@FunctionInterface
注解而已。在JDK1.8
之后加上了这个注解,并且在java.util.function
包下新增很多个函数式接口。 其中,我们需要知道的只有六个:Predicate
: 传入一个参数,返回一个bool
结果, 方法为boolean test(T t)
Consumer
: 传入一个参数,无返回值,纯消费。 方法为void accept(T t)
Function<T,R>
: 传入一个参数,返回一个结果,方法为R apply(T t)
Supplier
: 无参数传入,返回一个结果,方法为T get()
UnaryOperator
: 一元操作符, 继承Function<T,T>
, 传入参数的类型和返回类型相同。BinaryOperator
: 二元操作符, 传入的两个参数的类型和返回类型相同, 继承BiFunction<T,T,T>
为什么要了解这个 函数式接口呢?
因为 在 Stream
的方法中,大部分的参数都是使用 函数式接口 接受参数的。所以,如果要探究其实现原理和设计原则的话,这个是必须要知道的。
注意:
lambda
表达式,是一种语法的表现形式,使代码表现更加整洁lambda
和stream
是两个不相关的概念。
# 查 "术" 理: (查看源码,明晰基本的类结构)
先来看下 和 Stream
直接相关的类。
Stream
接口继承了 BaseStream
接口.
✔️ BaseStream 接口表示流的基本接口,而流是支持顺序和并行聚合操作的元素序列。
Stream
接口有很多实现类。其主要的一个实现类是 ReferencePipeline
类。除此之外 ReferencePipeline
类还继承了 AbstractPipeline
抽象类. ✔️ AbstractPipeline 表示 “管道” 类的抽象基类,它们是 Stream 接口及其原始特化的核心实现。再看 AbstractPipeline
类的父类 PipelineHelper
,✔️ AbstractPipeline 的作用是:用于执行流管道的辅助类,将有关流管道的所有信息(输出形状、中间操作、流标志、并行度等)集中在一个地方。
ReferencePipeline
类有三个子类: StatefulOp
表示有状态的操作, StatelessOp
表示无状态的操作, Head
表示 ReferencePipeline
的起始阶段。 当然了,这三个子类也是 流。
# 从创建流开始
不管是使用 Stream.of(T t)
还是 Collection.stream()
,还是 Arrays.stream()
, 底层的实现都是通过 StreamSupport.stream()
来实现的。
✔️ StreamSupport 类的作用是:用于创建和操作流的底层实用方法。
可以看到 直接返回的是 ReferencePipeline.Head
对象。 首先 Head
是一种 Stream
的实现。 接着去看 Head
的构造方法,可以看到其实调用的是: AbstractPipeline
的构造方法.
# 流的中间操作
文中已经谈及了 中间操作分为有状态的中间操作和无状态的中间操作。那我们以一个案例来说明操作与操作之间执行的。
1 | List<Integer> numbers = Stream.of(1, 2, 3, 4) |
Stream.of()
方法上文已经简单的说明了,接下来我们来看 map()
方法。
可以看到, map()
返回了一个 StatelessOp
对象,并且重写了 AbstractPipeline
的 opWrapSink
方法。 之前也说过:它表示流的无状态中间阶段的基类。 还有一个 Sink
类型. Sink
类表示 Consumer
接口的扩展,用于在流管道的各个阶段传递值,以及管理大小信息、控制流等的附加方法。
我们再仔细看一下这个方法。首先这个方法并没有进行任何的计算,只是将 item -> item + 5
这个操作进行三层的封装, 1. 将 map
方法的返回值重新封装成了流对象,2. 把我们的 item -> item + 5
这个操作封装成了 StatelessOp
, 并重写了 opWrapSink
这个方法,并在终止操作时进行调用。 3. 使用 sink
( Sink.ChainedReference)
将管道的各个阶段连接起来。即赋值 downStream
. 使用 downstream
这个 Consumer
完成 accept
调用。
这里需要注意一下: StatelessOp
类的构造方法的实体参传输了一个 this
字段。仔细翻看源码就会返现它一直调用到 AbstractPipeline
的构造方法中。
可以看到 AbstractPipeline
中有两个字段 nextStage
和 previousStage
字段,分别表示的是上一阶段和下一阶段。其中 nextStage
是 当前阶段。 previousStage
则应该 当前阶段的上一个阶段,其实就是调用当前方法的对象。
不知道你是否发现 通过这种方法, stream
组成了一个 流各个阶段的双向链表。节点就是流操作的各个阶段。
ps: 这样一次流操作会创建两个链表: Stream
阶段的双向链表,和 在终止操作时,根据双向链表生成的 Sink
链表。
再次说明:到目前为止, map()
方法里只是进行了封装,没有进行任何计算!
接着来看 sorted()
方法。
sorted
方法比较简单,通过调用 SortedOps
类的 makeRef
方法,创建了 OfRef
对象。 OfRef
类的作用是:用于对流进行排序的专用子类型。 OfRef
类继承了 ReferencePipeline.StatefulOp
,所以 OfRef
是一个有状态操作。那自然它也会有 opWrapSink
方法。也就是说它也会返回一个 Sink 对象,只是这个 Sink
对象的实现类不一样的。
说明:到目前为止,
sorted()
方法里只是进行了封装,没有进行任何计算!
同理去看 limit
方法。
这个方法的内部是直接创建了一个 ReferencePipeline.StatefulOp
对象,也是重写了其中的方法: opWrapSink
.
不知道你是否有好奇,我为什么每次都会提到 opWrapSink
这个方法呢?因为这个方法非常的重要!其重要性我们在 探” 法 “择 这部分会完整的说明。
再三说明:到目前为止,
limit()
方法里只是进行了封装,没有进行任何计算!
书行至此,案例中的中间操作都已经简单的分析完成了。我们就知道这里 jdk
为了完成 流操作为每个中间操作都封装了很多的对象,而这些对象只是散列在了内存中。接下来,就要看 jdk
是如何把他们组装到一起的。
# 终止操作
以 Collect
方法为例,去探究一下终止操作的流程。
可以看到在 collect
方法中,分为并行执行方式和串行执行方法,我们看串行执行时,会创建 ReduceOps
终止操作对象。
将 终止操作 传递给 evaluate 方法,然后调用终止操作的 evaluate 方法,当然这个方法也分成了串行执行和并行执行两种。
helper
其实是 limit(3)
中间操作返回的对象。这其实中间操作的最后一个 Stage
(阶段)。返回的对象是 AbstractPipeline
和 Stream
的子类实例。
这里包含两个方法: wrapSink()
和 copyInfo()
.
这是两个非常重要的方法. wrapSink()
是将中间的操作组成 SinkChain
。 copyInfo()
这是执行真正的计算逻辑。
方法中的形参 sink
就是最后的阶段的终止操作。方法通过循环将 sink
分装到 Sink
中。 Sink
接口 的一个实现类是 ChainedReference
, 类中定义了一个 downStream
字段。 会将 sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
中的 sink
赋值给 downStream
. 这样就形成了 套娃。 最后返回一个 wrapSink
, 即整个流操作中所有的操作的 封装 Sink
.
图中所示的即为上面提及的 封装 Sink
. 可中断和不可中断的区别是:可中断如果获取值,就不必再取所有的结果了。反之,就需要计算出所有阶段的结果。
非可中断的终止操作时,会执行 begin()
, forEachRemaining()
, end()
, 三个方法。 这个三个方法对应的是: Sink
接口中提供的三个方法。
1 | // 每个Sink开始之前调用该方法,通知sink做好准备 |
其中,
begin()
方法,会调用每个Sink
子类的begin
方法。forEachRemaining()
方法对应的执行内容如下图:
end()
方法,会调用每个Sink
字段的end
方法。
书行至此。或许你会对 forEachRemaining
方法感到好奇。后面我会写一篇文章来专门分享: 《 Stream
的高级迭代器》, 希望你能继续关注支持我~
# 探” 法 “择
我们从一个案例出发,在细节之处分析了一个 Stream
的执行过程。现在我们需要从全局来看一下 Stream
的执行过程是什么样子的.
上文中我们知道了 Stream
的 所有计算都是在 终止操作时 触发的。 所有的中间操作都是封装了一些对象。我们用一张图来描述下 Stream
的执行过程。
stream
将创建的流做为第一个Stage
, 用来代表流的开始, 每个Stage
都是AbstractPipeline
的子类。 第一个Stage
是AbstractPipeline.Head
对象。- 然后将中间操作封装成后面的 n 个
stage
. 并组成 双向链表的形式,并且存储了stage0
. 每个Stage
都是StatelessOp
或者statefulOp
. - 终止操作通过
wrapSink()
方法 会触发将 每个阶段的操作封装成Sink
. 并且sink
都会做为参数传递到上一个阶段的opWrapSink()
方法中,从而组成一个sink
链表。 - 然后,通过
copyInfo()
方法将,交于Spilterator
进行迭代。计算的结果可以分为四种- 返回
boolean
类型的结果:比如anyMatch()
allMatch()
noneMatch()
方法。 - 返回
Optional
类型的结果: 比如findFirst()
findAny()
方法 - 还有归约操作:
reduce()
collect()
- 返回数组的:
toArray()
对于表中返回boolean
或者Optional
的操作(Optional
是存放 一个 值的容器)的操作,由于值返回一个值,只需要在对应的Sink
中记录这个值,等到执行结束时返回就可以了。
对于归约操作,最终结果放在用户调用时指定的容器中(容器类型通过收集器指定)。collect()
,reduce()
,max()
,min()
都是归约操作,虽然max()
和min()
也是返回一个Optional
,但事实上底层是通过调用reduce()
方法实现的。
对于返回是数组的情况,毫无疑问的结果会放在数组当中。这么说当然是对的,但在最终返回数组之前,结果其实是存储在一种叫做Node
的数据结构中的。Node
是一种多叉树结构,元素存储在树的叶子当中,并且一个叶子节点可以存放多个元素。这样做是为了并行执行方便。关于Stream
的并行计算,我后面会继续分享。
- 返回
# 明 "道" 义
JDK
提供的 Stream
具有如下特点:
- 无存储。
stream
不是一种数据结构,它只是某种数据源的一个视图,数据源可以是一个数组,Java
容器或I/O channel
等。 - 为函数式编程而生。对
stream
的任何修改都不会修改背后的数据源,比如对stream
执行过滤操作并不会删除被过滤的元素,而是会产生一个不包含被过滤元素的新stream
。 - 惰式执行。
stream
上的操作并不会立即执行,只有等到用户真正需要结果的时候才会执行。 - 可消费性。
stream
只能被 “消费” 一次,一旦遍历过就会失效,就像容器的迭代器那样,想要再次遍历必须重新生成。
在这一趴,我就围绕两个点来简单的聊聊。
JDK8
为什么要加入Stream
.
除了上面四个特点之外, Java8
中的 Stream
是对集合对象的增强,当然不仅仅是集合对象。 Stream
为开发者提供了简洁的编码方式和编码风格,极大的提高了开发的效率。
另外一个更重要的点在于 Stream
为我们下篇文章要分享的 Stream
并行计算流 提供了实现,请期待。
Stream
为什么要这么设计?
我这里给一份我的回答,这个问题也留给看文章的你,也希望能看到你的回答。
根据上文所说的内容, Stream
体系是一组接口家族, AbstractPipeline
是接口的实现, PipelineHelper
是管道的辅助类, StreamSupport
是流的底层工具类
Stream
使用 stage
来抽象流水线上的每个操作,其实每个 stage
就是一个 stream
子类的实例, 也就是 AbstractPipeline
几个子类的内部子类即 Head
StatelessOp
statefulOp
;
StreamSupport
用于创建生成 Stream
对应的是 Head
类,其他的中间操作分为有状态和无状态的,中间操作通过方法比如 filter
map
等返回的是 StatelessOp
或者 statefulOp
. 多个 stage
组合称为双向链表的形式 从而成了整个流水线
有了流水线,相邻两个操作阶段之间如何协调运算?
于是又有了 Sink
的概念,又来协调相邻的 stage
之间计算运行
他的模式是 begin
accept
end
还有短路标记
他的 accept
就是封装了回调方法,所以说每个操作 stage
, StatelessOp
或者 statefulOp
中又封装了 Sink
. 通过 AbstractPipeline
提供的 opWrapSink
方法可以获取这个 Sink
调用这个 sink
的 accept
方法就可以调用当前操作的方法
那么如何串联起来呢?
关键点在于 opWrapSink
方法,他接收一个 Sink
作为参数,在调用 accept
方法中。可以调用这个入参 Sink
的 accept
方法
这样子从当前就能调用下一个,也就是说有了推动的动作。那么只需要找到开始,每个处理了之后都推动下一个,就顺序完成了所欲的操作了。
# 结语
通过看 Stream
相关的知识点,发现一篇文章是没法讲清楚的。
这一次,我又果不其然的留下了两篇文章
请给我记代办~
在分享 并行计算流
的时候,我们需要以 JDK1.7
中的 forkJoin
框架为前提,来分析 Stream
的 parallelStream
.
在分享 迭代器
的时候,我们也会分析一下 JDK
中提供的 普通迭代器,比如 ForEach
, iterator
, 以及 Stream
的高级迭代器 spliterator
. 也会由浅入深的分析一下,各种迭代器的优缺点。 也会自定义实现一个迭代器。
敬请期待,防止走丢见文末。关注我,期望和你一起遇见更好的自己.
# 最后
期望和你一起遇见更好的自己