Skip to content

Commit 4cae1fc

Browse files
authored
Merge pull request lingcoder#153 from blackwatchcup/master
Parallel Streams
2 parents 54f7e9f + af1c0c1 commit 4cae1fc

File tree

1 file changed

+185
-2
lines changed

1 file changed

+185
-2
lines changed

docs/book/24-Concurrent-Programming.md

Lines changed: 185 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ Java 8 CompletableFuture是一个更好的解决方案:它允许您将操作
279279
<!-- Parallel Streams -->
280280
## 并行流
281281

282-
Java 8流的一个显着优点是,在某些情况下,它们可以很容易地并行化。这来自仔细的库设计,特别是流使用内部迭代的方式 - 也就是说,它们控制着自己的迭代器。特别是,它们使用一种特殊的迭代器,称为Spliterator,它被限制为易于自动分类。这产生了相当神奇的结果,只能说.parallel(),并且你的流中的所有东西都是作为一组并行任务运行的。如果您的代码是使用Streams编写的,那么并行化以提高速度似乎微不足道。
282+
Java 8流的一个显着优点是,在某些情况下,它们可以很容易地并行化。这来自仔细的库设计,特别是流使用内部迭代的方式 - 也就是说,它们控制着自己的迭代器。特别是,他们使用一种特殊的迭代器,称为Spliterator,它被限制为易于自动分割。这产生了相当神奇的结果,即能够简单用parallel()然后流中的所有内容都作为一组并行任务运行。如果您的代码是使用Streams编写的,那么并行化以提高速度似乎是一种琐事
283283

284284
例如,考虑来自Streams的Prime.java。查找质数可能是一个耗时的过程,我们可以看到该程序的计时:
285285

@@ -317,7 +317,190 @@ public class ParallelPrime {
317317
*/
318318
```
319319

320-
请注意,这不是微基准测试,因为我们计时整个程序。我们将数据保存在磁盘上以防止激进的优化;如果我们没有对结果做任何事情,那么一个狡猾的编译器可能会观察到程序没有意义并且消除了计算(这不太可能,但并非不可能)。请注意使用nio2库编写文件的简单性(在[文件](./17-Files.md)一章中有描述)。
320+
请注意,这不是微基准测试,因为我们计时整个程序。我们将数据保存在磁盘上以防止过激的优化;如果我们没有对结果做任何事情,那么一个高级的编译器可能会观察到程序没有意义并且消除了计算(这不太可能,但并非不可能)。请注意使用nio2库编写文件的简单性(在[文件](./17-Files.md)一章中有描述)。
321+
322+
当我注释掉[1] parallel()行时,我的结果大约是parallel()的三倍。
323+
324+
并行流似乎是一个甜蜜的交易。您所需要做的就是将编程问题转换为流,然后插入parallel()以加快速度。实际上,有时候这很容易。但遗憾的是,有许多陷阱。
325+
326+
- parallel()不是灵丹妙药
327+
328+
作为对流和并行流的不确定性的探索,让我们看一个看似简单的问题:求和数字的增量序列。事实证明这是一个令人惊讶的数量,并且我将冒险将它们进行比较 - 试图小心,但承认我可能会在计时代码执行时遇到许多基本陷阱之一。结果可能有一些缺陷(例如JVM没有“升温”),但我认为它仍然提供了一些有用的指示。
329+
330+
我将从一个计时方法rigorously 开始,它采用**LongSupplier**,测量**getAsLong()**调用的长度,将结果与**checkValue**进行比较并显示结果。
331+
332+
请注意,一切都必须严格使用**long**;我花了一些时间发现隐蔽的溢出,然后才意识到在重要的地方错过了**long**
333+
334+
所有关于时间和内存的数字和讨论都是指“我的机器”。你的经历可能会有所不同。
335+
336+
```java
337+
// concurrent/Summing.java
338+
import java.util.stream.*;
339+
import java.util.function.*;
340+
import onjava.Timer;
341+
public class Summing {
342+
static void timeTest(String id, long checkValue, LongSupplier operation){
343+
System.out.print(id + ": ");
344+
Timer timer = newTimer();
345+
long result = operation.getAsLong();
346+
if(result == checkValue)
347+
System.out.println(timer.duration() + "ms");
348+
else
349+
System.out.format("result: %d%ncheckValue: %d%n", result, checkValue);
350+
}
351+
public static final int SZ = 100_000_000;// This even works://
352+
public static final int SZ = 1_000_000_000;
353+
public static final long CHECK = (long)SZ * ((long)SZ + 1)/2; // Gauss's formula
354+
public static void main(String[] args){
355+
System.out.println(CHECK);
356+
timeTest("Sum Stream", CHECK, () ->
357+
LongStream.rangeClosed(0, SZ).sum());
358+
timeTest("Sum Stream Parallel", CHECK, () ->
359+
LongStream.rangeClosed(0, SZ).parallel().sum());
360+
timeTest("Sum Iterated", CHECK, () ->
361+
LongStream.iterate(0, i -> i + 1)
362+
.limit(SZ+1).sum());
363+
// Slower & runs out of memory above 1_000_000:
364+
// timeTest("Sum Iterated Parallel", CHECK, () ->
365+
// LongStream.iterate(0, i -> i + 1)
366+
// .parallel()
367+
// .limit(SZ+1).sum());
368+
}
369+
}
370+
/* Output:5000000050000000
371+
Sum Stream: 167ms
372+
Sum Stream Parallel: 46ms
373+
Sum Iterated: 284ms
374+
*/
375+
```
376+
377+
**CHECK**值是使用Carl Friedrich Gauss在1700年代后期仍在小学时创建的公式计算出来的.
378+
379+
**main()** 的第一个版本使用直接生成 **Stream** 并调用 **sum()** 的方法。我们看到流的好处在于十亿分之一的SZ在没有溢出的情况下处理(我使用较小的数字,因此程序运行时间不长)。使用 **parallel()** 的基本范围操跟快。
380+
381+
如果使用**iterate()**来生成序列,则减速是戏剧性的,可能是因为每次生成数字时都必须调用lambda。但是如果我们尝试并行化,那么结果通常比非并行版本花费的时间更长,但是当**SZ**超过一百万时,它也会耗尽内存(在某些机器上)。当然,当你可以使用**range()**时,你不会使用**iterate()**,但如果你生成的东西不是简单的序列,你必须使用**iterate()**。应用**parallel()**是一个合理的尝试,但会产生令人惊讶的结果。我们将在后面的部分中探讨内存限制的原因,但我们可以对流并行算法进行初步观察:
382+
383+
- 流并行性将输入数据分成多个部分,因此算法可以应用于那些单独的部分。
384+
- 阵列分割成本低廉,均匀且具有完美的分裂知识。
385+
- 链接列表没有这些属性;“拆分”一个链表仅仅意味着把它分成“第一元素”和“其余列表”,这相对无用。
386+
- 无状态生成器的行为类似于数组;使用上述范围是无可争议的。
387+
- 迭代生成器的行为类似于链表; **iterate()** 是一个迭代生成器。
388+
389+
现在让我们尝试通过在数组中填充值来填充数组来解决问题。因为数组只分配了一次,所以我们不太可能遇到垃圾收集时序问题。
390+
391+
首先我们将尝试一个充满原始**long**的数组:
392+
393+
```java
394+
// concurrent/Summing2.java
395+
// {ExcludeFromTravisCI}import java.util.*;
396+
public class Summing2 {
397+
static long basicSum(long[] ia) {
398+
long sum = 0;
399+
int size = ia.length;
400+
for(int i = 0; i < size; i++)
401+
sum += ia[i];return sum;
402+
}
403+
// Approximate largest value of SZ before
404+
// running out of memory on mymachine:
405+
public static final int SZ = 20_000_000;
406+
public static final long CHECK = (long)SZ * ((long)SZ + 1)/2;
407+
public static void main(String[] args) {
408+
System.out.println(CHECK);
409+
long[] la = newlong[SZ+1];
410+
Arrays.parallelSetAll(la, i -> i);
411+
Summing.timeTest("Array Stream Sum", CHECK, () ->
412+
Arrays.stream(la).sum());
413+
Summing.timeTest("Parallel", CHECK, () ->
414+
Arrays.stream(la).parallel().sum());
415+
Summing.timeTest("Basic Sum", CHECK, () ->
416+
basicSum(la));// Destructive summation:
417+
Summing.timeTest("parallelPrefix", CHECK, () -> {
418+
Arrays.parallelPrefix(la, Long::sum)
419+
return la[la.length - 1];
420+
});
421+
}
422+
}
423+
/* Output:200000010000000
424+
Array Stream
425+
Sum: 104ms
426+
Parallel: 81ms
427+
Basic Sum: 106ms
428+
parallelPrefix: 265ms
429+
*/
430+
```
431+
432+
第一个限制是内存大小;因为数组是预先分配的,所以我们不能创建几乎与以前版本一样大的任何东西。并行化可以加快速度,甚至比使用 **basicSum()** 循环更快。有趣的是, **Arrays.parallelPrefix()** 似乎实际上减慢了速度。但是,这些技术中的任何一种在其他条件下都可能更有用 - 这就是为什么你不能做出任何确定性的声明,除了“你必须尝试一下”。”
433+
434+
最后,考虑使用盒装**Long**的效果:
435+
436+
```java
437+
// concurrent/Summing3.java
438+
// {ExcludeFromTravisCI}
439+
import java.util.*;
440+
public class Summing3 {
441+
static long basicSum(Long[] ia) {
442+
long sum = 0;
443+
int size = ia.length;
444+
for(int i = 0; i < size; i++)
445+
sum += ia[i];
446+
return sum;
447+
}
448+
// Approximate largest value of SZ before
449+
// running out of memory on my machine:
450+
public static final int SZ = 10_000_000;
451+
public static final long CHECK = (long)SZ * ((long)SZ + 1)/2;
452+
public static void main(String[] args) {
453+
System.out.println(CHECK);
454+
Long[] aL = newLong[SZ+1];
455+
Arrays.parallelSetAll(aL, i -> (long)i);
456+
Summing.timeTest("Long Array Stream Reduce", CHECK, () ->
457+
Arrays.stream(aL).reduce(0L, Long::sum));
458+
Summing.timeTest("Long Basic Sum", CHECK, () ->
459+
basicSum(aL));
460+
// Destructive summation:
461+
Summing.timeTest("Long parallelPrefix",CHECK, ()-> {
462+
Arrays.parallelPrefix(aL, Long::sum);
463+
return aL[aL.length - 1];
464+
});
465+
}
466+
}
467+
/* Output:50000005000000
468+
Long Array
469+
Stream Reduce: 1038ms
470+
Long Basic
471+
Sum: 21ms
472+
Long parallelPrefix: 3616ms
473+
*/
474+
```
475+
476+
现在可用的内存量大约减半,并且所有情况下所需的时间都会很长,除了**basicSum()**,它只是循环遍历数组。令人惊讶的是, **Arrays.parallelPrefix()** 比任何其他方法都要花费更长的时间。
477+
478+
我将 **parallel()** 版本分开了,因为在上面的程序中运行它导致了一个冗长的垃圾收集,扭曲了结果:
479+
480+
```java
481+
// concurrent/Summing4.java
482+
// {ExcludeFromTravisCI}
483+
import java.util.*;
484+
public class Summing4 {
485+
public static void main(String[] args) {
486+
System.out.println(Summing3.CHECK);
487+
Long[] aL = newLong[Summing3.SZ+1];
488+
Arrays.parallelSetAll(aL, i -> (long)i);
489+
Summing.timeTest("Long Parallel",
490+
Summing3.CHECK, () ->
491+
Arrays.stream(aL)
492+
.parallel()
493+
.reduce(0L,Long::sum));
494+
}
495+
}
496+
/* Output:50000005000000
497+
Long Parallel: 1014ms
498+
*/
499+
```
500+
501+
它比非parallel()版本略快,但并不显着。
502+
503+
这种时间增加的一个重要原因是处理器内存缓存。使用**Summing2.java**中的原始**long**,数组**la**是连续的内存。处理器可以更容易地预测该阵列的使用,并使缓存充满下一个需要的阵列元素。访问缓存比访问主内存快得多。似乎 **Long parallelPrefix** 计算受到影响,因为它为每个计算读取两个数组元素,并将结果写回到数组中,并且每个都为**Long**生成一个超出缓存的引用。
321504

322505
<!-- Creating and Running Tasks -->
323506
## 创建和运行任务

0 commit comments

Comments
 (0)