Java 的集合框架(如 List 和 Map 接口及 Arraylist 和 HashMap 类)让我们很容易地管理有序和无序集合。集合框架自引入的第一天起就在持续的改进。在 Java 8 中,我们可以通过流的 API 来管理、遍历和聚合集合。流用一种全新的方式,将数据作为一个整体,而不是单独的个体来处理。集合专注的是数据,流专注的是计算。可以直接从一个集合创建一个流,然后就能用这个流来完成许多事情,如遍历、过滤及聚合。Java 8 中有两种集合流:串行流和并行流。

从集合或数组创建流

两种简单方式:

1
2
3
4
5
6
7
8
List<person> people1 = new ArrayList<>(...);
Person [] people2 = {...}

Stream A = people1.stream();
Stream B = Stream.of(people1);

Stream C = Arrays.stream(people2);
Stream D = Stream.of(people12);

无论是 Stream.of() 还是 Arrays.stream(),所做的事情实质上是一样的:都是从一个基本类型或者复合对象类型的数组转换为流对象。

无限流:

1
2
Stream<Double> infStream1 = Stream.iterate(0., d->d);
Stream<Double> infStream2 = Stream.generate(()->Math.random());

流被创建后并不会直接将整个流的内容加载到内存中,而是在整个流触发终止操作时才会一次性将整个流操作全部执行。

操作流

中间操作
  • 筛选 filter(Predicate),distinct()
  • 切割 limit(long),skip(long)
  • 映射 map(Function),flatMap(Function)
  • 排序 sorted(),sorted(Comparator)
终止操作
  • 查找 findFirst(),findAny(),count(),max(Comparator),min(Comparator),forEach(Consumer)
  • 匹配 allMatch(Predicate),anyMatch(Predicate),noneMatch(Predicate)
  • 收集 collect(Collector)
  • 归约 reduce(T, BinaryOperator),reduce(BinaryOperator)

把流看作是一条对数据处理的流水线的话,中间操作相当于对数据的筛选与加工,终止操作则是对之前操作的结果进行查找或收集。

使用 Collectors 收集结果
  • 收集成 Collection toList(), toSet(), toCollection()
  • 计算均值、最值、和、统计值 averagingX(), maxBy(), minBy(), summarizingX(), summingX()
  • 分组收集 groupingBy(), partitioningBy()
使用 reduce 组合结果

这个方法的主要作用是把 Stream 元素组合起来。它提供一个起始值(种子),然后依照运算规则(BinaryOperator),和前面 Stream 的第一个、第二个、第 n 个元素组合。不指定起始值时会把 Stream 的前面两个元素组合起来,返回 Optional 类型对象。从这个意义上说,字符串拼接、数值的 sum、min、max、average 都是特殊的 reduce。例如 Stream 的 sum 就相当于

1
2
Integer sum = integers.reduce(0, (a, b) -> a+b); 
Integer sum = integers.reduce(0, Integer::sum);

reduce() 用例

1
2
3
4
5
6
7
8
9
10
11
12
// 字符串连接,concat = "ABCD"
String concat = Stream.of("A", "B", "C", "D").reduce("", String::concat);
// 求最小值,minValue = -3.0
double minValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double.MAX_VALUE, Double::min);
// 求和,sumValue = 10, 有起始值
int sumValue = Stream.of(1, 2, 3, 4).reduce(0, Integer::sum);
// 求和,sumValue = 10, 无起始值
sumValue = Stream.of(1, 2, 3, 4).reduce(Integer::sum).get();
// 过滤,字符串连接,concat = "ace"
concat = Stream.of("a", "B", "c", "D", "e", "F").
filter(x -> x.compareTo("Z") > 0).
reduce("", String::concat);

串行/并行 测试时遇到的问题

使用并行化流相比使用串行化流的速度提升远高于理论值;先执行并行化流的计算再执行串行化流的计算结果耐人寻味。
可能:速度提升主要来源于编译器/CPU/JVM执行过程优化?简单类型集合的流串并行执行耗时差别小?
代码:

1
2
3
4
5
6
7
8
9
10
long begin = System.currentTimeMillis();
List<Long> ll = LongStream.rangeClosed(1, 30000000L).boxed().collect(Collectors.toList());
long inited = System.currentTimeMillis();
long result1 = ll.stream().reduce(0L, Long::sum);
long seq = System.currentTimeMillis();
long result2 = ll.parallelStream().reduce(0L, Long::sum);
long par = System.currentTimeMillis();
System.out.println(result1 == result2);
System.out.println("init:" + (inited - begin) + ", seq:" + (seq - inited) + ", par:" + (par - seq));
System.out.println((seq - inited) / (par - seq) + " times faster");

输出1:

1
2
3
true
init:13478, seq:7613, par:265
28 times faster

交换代码中串行和并行处理的顺序后,得到输出2:

1
2
3
true
init:16068, par:6458, seq:218
29 times faster