学东西的时候最好是理论先行,为什么?没有理论,想当然的去干,干好了是 瞎猫碰上死耗子,干不好就瞎干,浪费时间,只会弄得身心俱疲。
可是在真正的工作中,很少工作会允许你先弄清原理再去实操。但是不管怎么说,欠下的债终究是需要还的。

今天咱们的主题是 stream . 咱们就从 Stream 的 "道,术,法,器" 四个阶段来聊好好的聊聊这个 Stream .

# 以 "器" 始:从使用开始

你平时是怎么使用 Stream 的?

比如我会使用 Stream 创建一个流。

1
2
Stream<Integer> integerStream = Stream.of(1, 2, 3);
// do somethings ..

或者把一种集合类型转成 stream ,然后做一些聚合操作

1
2
3
4
5
6
List<Integer> collect = list.stream()
.map(item -> item + 5)
.filter(item -> item > 10)
.sorted()
.limit(10)
.collect(Collectors.toList());

那在 jdk1.7 及以前的时候,我们是怎么处理的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 遍历list,所有元素+5
for (int i = 0; i < list.size(); i++) {
Integer integer = list.get(i);
integer += 5;
}

// 正序排序
sort(list);

// 创建新List.存储10个元素
List<Integer> newList = new ArrayList<>();
if (list.size() > 10) {
System.arraycopy(list, list.size() - 11, newList, 0, 10);
} else {
System.arraycopy(list, 0, newList, 0, list.size());
}

根据上面的对比,我们很明显的就能对比出来:
stream 的编码方式,使代码更加简洁,可读性也比较强。而且 Stream 提供了集合的常用操作,比如 sort , 过滤去重计数limit , skip 等等,直接可以用,可以大大的提高开发效率。

Stream 为我们提供了多少功能呢?

从全局来看,所有和 stream 相关的类,都在 java.lang.stream 这包下。

这个包下有很多的类。总体来说,

流处理相关的操作分为两类:

  • 中间操作 ( Intermediate Operations )
    • 无状态的中间操作 ( Stateless ): 使用 StatelessOp 表示。每个操作都是互不影响,不依赖的。这类的操作有: filter()flatMap()flatMapToDouble()flatMapToInt()flatMapToLong()map()mapToDouble()mapToInt()mapToLong()peek()unordered()
    • 有状态操作( Stateful ):使用 StatefulOp 表示。处理时会记录状态,比如处理了几个。后面元素的处理会依赖前面记录的状态,或者拿到所有元素才能继续下去。如 distinct()sorted()sorted(comparator)limit()skip()
  • 终止操作 ( Terminal Operations ):使用 TerminalOp 表示。
    • 非短路操作:处理完所有数据才能得到结果。如 collect()count()forEach()forEachOrdered()max()min()reduce()toArray() 等。
    • 短路( short-circuiting )操作:拿到符合预期的结果就会停下来,不一定会处理完所有数据。如 anyMatch()allMatch()noneMatch()findFirst()findAny() 等。

在深入探讨 stream 之前,我们需要储备些知识点。

  • 函数式接口 FunctionInterface

    JDK 提供了很多的函数式接口,包路径是: java.util.function . 函数式接口的作用是 Java8 对一类特定类型接口的称呼。这类接口只有一个抽象方法,并且使用 @FunctionInterface 注解进行注明。在 Java Lambda 的实现中, 开发组不想再为 Lambda 表达式单独定义一种特殊的 Structural 函数类型,称之为箭头类型( arrow type ), 依然想采用 Java 既有的类型系统 ( class , interface , method 等), 原因是增加一个结构化的函数类型会增加函数类型的复杂性,破坏既有的 Java 类型,并对成千上万的 Java 类库造成严重的影响。 权衡利弊, 因此最终还是利用 SAM 接口 ( Single Abstract Method ) 作为 Lambda 表达式的目标类型。

    函数式接口其实在 Jdk8 之前就已存在了,比如 java.lang.Runnable , java.util.concurrent.Callable , java.util.Comparator 等等。只是没有使用 @FunctionInterface 注解而已。在 JDK1.8 之后加上了这个注解,并且在 java.util.function 包下新增很多个函数式接口。 其中,我们需要知道的只有六个:

    • Predicate : 传入一个参数,返回一个 bool 结果, 方法为 boolean test(T t)
    • Consumer : 传入一个参数,无返回值,纯消费。 方法为 void accept(T t)
    • Function<T,R> : 传入一个参数,返回一个结果,方法为 R apply(T t)
    • Supplier : 无参数传入,返回一个结果,方法为 T get()
    • UnaryOperator : 一元操作符, 继承 Function<T,T> , 传入参数的类型和返回类型相同。
    • BinaryOperator : 二元操作符, 传入的两个参数的类型和返回类型相同, 继承 BiFunction<T,T,T>

为什么要了解这个 函数式接口呢?

因为 在 Stream 的方法中,大部分的参数都是使用 函数式接口 接受参数的。所以,如果要探究其实现原理和设计原则的话,这个是必须要知道的。

注意:
lambda 表达式,是一种语法的表现形式,使代码表现更加整洁 lambdastream 是两个不相关的概念。

# 查 "术" 理: (查看源码,明晰基本的类结构)

先来看下 和 Stream 直接相关的类。

Stream 接口继承了 BaseStream 接口.

✔️ BaseStream 接口表示流的基本接口,而流是支持顺序和并行聚合操作的元素序列。
Stream 接口有很多实现类。其主要的一个实现类是 ReferencePipeline 类。除此之外 ReferencePipeline 类还继承了 AbstractPipeline 抽象类. ✔️ AbstractPipeline 表示 “管道” 类的抽象基类,它们是 Stream 接口及其原始特化的核心实现。再看 AbstractPipeline 类的父类 PipelineHelper ,✔️ AbstractPipeline 的作用是:用于执行流管道的辅助类,将有关流管道的所有信息(输出形状、中间操作、流标志、并行度等)集中在一个地方。

ReferencePipeline 类有三个子类: StatefulOp 表示有状态的操作, StatelessOp 表示无状态的操作, Head 表示 ReferencePipeline 的起始阶段。 当然了,这三个子类也是 流。

# 从创建流开始

不管是使用 Stream.of(T t) 还是 Collection.stream() ,还是 Arrays.stream() , 底层的实现都是通过 StreamSupport.stream() 来实现的。

✔️ StreamSupport 类的作用是:用于创建和操作流的底层实用方法。

可以看到 直接返回的是 ReferencePipeline.Head 对象。 首先 Head 是一种 Stream 的实现。 接着去看 Head 的构造方法,可以看到其实调用的是: AbstractPipeline 的构造方法.

# 流的中间操作

文中已经谈及了 中间操作分为有状态的中间操作和无状态的中间操作。那我们以一个案例来说明操作与操作之间执行的。

1
2
3
4
5
List<Integer> numbers = Stream.of(1, 2, 3, 4)
.map(item -> item + 5)
.sorted((n1, n2) -> n2 - n1)
.limit(3)
.collect(Collectors.toList());

Stream.of() 方法上文已经简单的说明了,接下来我们来看 map() 方法。

可以看到, map() 返回了一个 StatelessOp 对象,并且重写了 AbstractPipelineopWrapSink 方法。 之前也说过:它表示流的无状态中间阶段的基类。 还有一个 Sink 类型. Sink 类表示 Consumer 接口的扩展,用于在流管道的各个阶段传递值,以及管理大小信息、控制流等的附加方法。

我们再仔细看一下这个方法。首先这个方法并没有进行任何的计算,只是将 item -> item + 5 这个操作进行三层的封装, 1. 将 map 方法的返回值重新封装成了流对象,2. 把我们的 item -> item + 5 这个操作封装成了 StatelessOp , 并重写了 opWrapSink 这个方法,并在终止操作时进行调用。 3. 使用 sink ( Sink.ChainedReference) 将管道的各个阶段连接起来。即赋值 downStream . 使用 downstream 这个 Consumer 完成 accept 调用。

这里需要注意一下: StatelessOp 类的构造方法的实体参传输了一个 this 字段。仔细翻看源码就会返现它一直调用到 AbstractPipeline 的构造方法中。

可以看到 AbstractPipeline 中有两个字段 nextStagepreviousStage 字段,分别表示的是上一阶段和下一阶段。其中 nextStage 是 当前阶段。 previousStage 则应该 当前阶段的上一个阶段,其实就是调用当前方法的对象。

不知道你是否发现 通过这种方法, stream 组成了一个 流各个阶段的双向链表。节点就是流操作的各个阶段。

ps: 这样一次流操作会创建两个链表: Stream 阶段的双向链表,和 在终止操作时,根据双向链表生成的 Sink 链表。

再次说明:到目前为止, map() 方法里只是进行了封装,没有进行任何计算!

接着来看 sorted() 方法。

sorted 方法比较简单,通过调用 SortedOps 类的 makeRef 方法,创建了 OfRef 对象。 OfRef 类的作用是:用于对流进行排序的专用子类型。 OfRef 类继承了 ReferencePipeline.StatefulOp ,所以 OfRef 是一个有状态操作。那自然它也会有 opWrapSink 方法。也就是说它也会返回一个 Sink 对象,只是这个 Sink 对象的实现类不一样的。

说明:到目前为止, sorted() 方法里只是进行了封装,没有进行任何计算!

同理去看 limit 方法。

这个方法的内部是直接创建了一个 ReferencePipeline.StatefulOp 对象,也是重写了其中的方法: opWrapSink .

不知道你是否有好奇,我为什么每次都会提到 opWrapSink 这个方法呢?因为这个方法非常的重要!其重要性我们在 探” 法 “择 这部分会完整的说明。

再三说明:到目前为止, limit() 方法里只是进行了封装,没有进行任何计算!

书行至此,案例中的中间操作都已经简单的分析完成了。我们就知道这里 jdk 为了完成 流操作为每个中间操作都封装了很多的对象,而这些对象只是散列在了内存中。接下来,就要看 jdk 是如何把他们组装到一起的。

# 终止操作

Collect 方法为例,去探究一下终止操作的流程。


可以看到在 collect 方法中,分为并行执行方式和串行执行方法,我们看串行执行时,会创建 ReduceOps 终止操作对象。

将 终止操作 传递给 evaluate 方法,然后调用终止操作的 evaluate 方法,当然这个方法也分成了串行执行和并行执行两种。

helper 其实是 limit(3) 中间操作返回的对象。这其实中间操作的最后一个 Stage (阶段)。返回的对象是 AbstractPipelineStream 的子类实例。

这里包含两个方法: wrapSink()copyInfo() .

这是两个非常重要的方法. wrapSink() 是将中间的操作组成 SinkChaincopyInfo() 这是执行真正的计算逻辑。

方法中的形参 sink 就是最后的阶段的终止操作。方法通过循环将 sink 分装到 Sink 中。 Sink 接口 的一个实现类是 ChainedReference , 类中定义了一个 downStream 字段。 会将 sink = p.opWrapSink(p.previousStage.combinedFlags, sink); 中的 sink 赋值给 downStream . 这样就形成了 套娃。 最后返回一个 wrapSink , 即整个流操作中所有的操作的 封装 Sink .

图中所示的即为上面提及的 封装 Sink . 可中断和不可中断的区别是:可中断如果获取值,就不必再取所有的结果了。反之,就需要计算出所有阶段的结果。

非可中断的终止操作时,会执行 begin() , forEachRemaining() , end() , 三个方法。 这个三个方法对应的是: Sink 接口中提供的三个方法。

1
2
3
4
5
6
// 每个Sink开始之前调用该方法,通知sink做好准备
default void begin(long size) {}
// 遍历元素时使用,接受一个待处理元素,并对元素进行处理。
default void accept(Double i);
// 通知sink没有元素进行处理了。
default void end() {}

其中,

  • begin() 方法,会调用每个 Sink 子类的 begin 方法。
  • forEachRemaining() 方法对应的执行内容如下图:
  • end() 方法,会调用每个 Sink 字段的 end 方法。

书行至此。或许你会对 forEachRemaining 方法感到好奇。后面我会写一篇文章来专门分享: 《 Stream 的高级迭代器》, 希望你能继续关注支持我~

# 探” 法 “择

我们从一个案例出发,在细节之处分析了一个 Stream 的执行过程。现在我们需要从全局来看一下 Stream 的执行过程是什么样子的.

上文中我们知道了 Stream 的 所有计算都是在 终止操作时 触发的。 所有的中间操作都是封装了一些对象。我们用一张图来描述下 Stream 的执行过程。

  • stream 将创建的流做为第一个 Stage , 用来代表流的开始, 每个 Stage 都是 AbstractPipeline 的子类。 第一个 StageAbstractPipeline.Head 对象。
  • 然后将中间操作封装成后面的 n 个 stage . 并组成 双向链表的形式,并且存储了 stage0 . 每个 Stage 都是 StatelessOp 或者 statefulOp .
  • 终止操作通过 wrapSink() 方法 会触发将 每个阶段的操作封装成 Sink . 并且 sink 都会做为参数传递到上一个阶段的 opWrapSink() 方法中,从而组成一个 sink 链表。
  • 然后,通过 copyInfo() 方法将,交于 Spilterator 进行迭代。计算的结果可以分为四种
    • 返回 boolean 类型的结果:比如 anyMatch() allMatch() noneMatch() 方法。
    • 返回 Optional 类型的结果: 比如 findFirst() findAny() 方法
    • 还有归约操作: reduce() collect()
    • 返回数组的: toArray()
      对于表中返回 boolean 或者 Optional 的操作( Optional 是存放 一个 值的容器)的操作,由于值返回一个值,只需要在对应的 Sink 中记录这个值,等到执行结束时返回就可以了。
      对于归约操作,最终结果放在用户调用时指定的容器中(容器类型通过收集器指定)。 collect() , reduce() , max() , min() 都是归约操作,虽然 max()min() 也是返回一个 Optional ,但事实上底层是通过调用 reduce() 方法实现的。
      对于返回是数组的情况,毫无疑问的结果会放在数组当中。这么说当然是对的,但在最终返回数组之前,结果其实是存储在一种叫做 Node 的数据结构中的。 Node 是一种多叉树结构,元素存储在树的叶子当中,并且一个叶子节点可以存放多个元素。这样做是为了并行执行方便。关于 Stream 的并行计算,我后面会继续分享。

# 明 "道" 义

JDK 提供的 Stream 具有如下特点:

  • 无存储。 stream 不是一种数据结构,它只是某种数据源的一个视图,数据源可以是一个数组, Java 容器或 I/O channel 等。
  • 为函数式编程而生。对 stream 的任何修改都不会修改背后的数据源,比如对 stream 执行过滤操作并不会删除被过滤的元素,而是会产生一个不包含被过滤元素的新 stream
  • 惰式执行。 stream 上的操作并不会立即执行,只有等到用户真正需要结果的时候才会执行。
  • 可消费性。 stream 只能被 “消费” 一次,一旦遍历过就会失效,就像容器的迭代器那样,想要再次遍历必须重新生成。

在这一趴,我就围绕两个点来简单的聊聊。

  • JDK8 为什么要加入 Stream .

除了上面四个特点之外, Java8 中的 Stream 是对集合对象的增强,当然不仅仅是集合对象。 Stream 为开发者提供了简洁的编码方式和编码风格,极大的提高了开发的效率。

另外一个更重要的点在于 Stream 为我们下篇文章要分享的 Stream 并行计算流 提供了实现,请期待。

  • Stream 为什么要这么设计?

我这里给一份我的回答,这个问题也留给看文章的你,也希望能看到你的回答。

根据上文所说的内容, Stream 体系是一组接口家族, AbstractPipeline 是接口的实现, PipelineHelper 是管道的辅助类, StreamSupport 是流的底层工具类

Stream 使用 stage 来抽象流水线上的每个操作,其实每个 stage 就是一个 stream 子类的实例, 也就是 AbstractPipeline 几个子类的内部子类即 Head StatelessOp statefulOp ;

StreamSupport 用于创建生成 Stream 对应的是 Head 类,其他的中间操作分为有状态和无状态的,中间操作通过方法比如 filter map 等返回的是 StatelessOp 或者 statefulOp . 多个 stage 组合称为双向链表的形式 从而成了整个流水线

有了流水线,相邻两个操作阶段之间如何协调运算?

于是又有了 Sink 的概念,又来协调相邻的 stage 之间计算运行

他的模式是 begin accept end 还有短路标记

他的 accept 就是封装了回调方法,所以说每个操作 stage , StatelessOp 或者 statefulOp 中又封装了 Sink . 通过 AbstractPipeline 提供的 opWrapSink 方法可以获取这个 Sink

调用这个 sinkaccept 方法就可以调用当前操作的方法

那么如何串联起来呢?

关键点在于 opWrapSink 方法,他接收一个 Sink 作为参数,在调用 accept 方法中。可以调用这个入参 Sinkaccept 方法

这样子从当前就能调用下一个,也就是说有了推动的动作。那么只需要找到开始,每个处理了之后都推动下一个,就顺序完成了所欲的操作了。

# 结语

通过看 Stream 相关的知识点,发现一篇文章是没法讲清楚的。

这一次,我又果不其然的留下了两篇文章

请给我记代办~

在分享 并行计算流 的时候,我们需要以 JDK1.7 中的 forkJoin 框架为前提,来分析 StreamparallelStream .

在分享 迭代器 的时候,我们也会分析一下 JDK 中提供的 普通迭代器,比如 ForEach , iterator , 以及 Stream 的高级迭代器 spliterator . 也会由浅入深的分析一下,各种迭代器的优缺点。 也会自定义实现一个迭代器。

敬请期待,防止走丢见文末。关注我,期望和你一起遇见更好的自己.

# 最后

期望和你一起遇见更好的自己