java8 streams
在Data Geekery ,我们喜欢Java。 而且,由于我们真的很喜欢jOOQ的流畅的API和查询DSL ,我们对Java 8将为我们的生态系统带来什么感到非常兴奋。
Java 8星期五
每个星期五,我们都会向您展示一些不错的教程风格的Java 8新功能,这些功能利用了lambda表达式,扩展方法和其他出色的功能。 您可以在GitHub上找到源代码 。
使用Streams API时的10个细微错误
我们已经完成了所有SQL错误列表:
- Java开发人员在编写SQL时常犯的10个错误
- Java开发人员在编写SQL时犯的10个常见错误
- Java开发人员在编写SQL时再犯的10个常见错误(您不会相信最后一个)
但是我们还没有用Java 8列出前10个错误列表! 在今天的场合( 13日星期五 ),我们将赶上您使用Java 8时应用程序中出现的问题(这不会发生在我们身上,因为我们将Java 6留在了另一个Java 6上)而)。
1.意外重用流
想打赌,这至少每个人都会发生一次。 像现有的“流”(例如InputStream
)一样,您只能使用一次流。 以下代码不起作用:
IntStream stream = IntStream.of(1, 2);
stream.forEach(System.out::println);// That was fun! Let's do it again!
stream.forEach(System.out::println);
您将获得:
java.lang.IllegalStateException: stream has already been operated upon or closed
因此在使用流时要小心。 只能执行一次。
2.意外创建“无限”流
您无需注意即可轻松创建无限流。 请看以下示例:
// Will run indefinitely
IntStream.iterate(0, i -> i + 1).forEach(System.out::println);
如果您将流设计为无限的,那么流的全部要点就是事实。 唯一的问题是,您可能不需要这样做。 因此,请确保始终设置适当的限制:
// That's better
IntStream.iterate(0, i -> i + 1).limit(10).forEach(System.out::println);
3.意外地创建“微妙”的无限流
我们不能这么说。 您最终将意外地创建无限流。 以以下流为例:
IntStream.iterate(0, i -> ( i + 1 ) % 2).distinct().limit(10).forEach(System.out::println);
所以…
- 我们生成交替的0和1
- 那么我们只保留不同的值,即单个0和单个1
- 那么我们将流的大小限制为10
- 然后我们消耗它
好吧…… distinct()
操作不知道提供给iterate()
方法的函数只会产生两个不同的值。 它可能会期望更多。 因此它将永远消耗流中的新值,并且永远不会达到limit(10)
。 不幸的是,您的应用程序停顿了。
4.意外地创建“微妙”的并行无限流
我们确实需要坚持,您可能会意外地尝试消耗无限的流。 让我们假设您认为 distinct()
操作应并行执行。 您可能正在编写:
IntStream.iterate(0, i -> ( i + 1 ) % 2).parallel().distinct().limit(10).forEach(System.out::println);
现在,我们已经看到,这种情况将永远发生。 但至少在以前,您仅消耗计算机上的一个CPU。 现在,您可能会消耗其中的四个,可能会意外地无限消耗流,从而几乎占据整个系统。 真不好 之后,您可能可以硬重启服务器/开发计算机。 在爆炸之前,最后查看一下我的笔记本电脑的外观:
5.混合操作顺序
那么,为什么我们坚持要您绝对意外地创建无限流? 这很简单。 因为您可能只是偶然地这样做。 如果您切换limit()
和distinct()
的顺序,则可以完美地使用上述流:
IntStream.iterate(0, i -> ( i + 1 ) % 2).limit(10).distinct().forEach(System.out::println);
现在产生:
0
1
为什么? 因为我们首先将无限流限制为10个值(0 1 0 1 0 1 0 1 0 1),然后再将有限流减小为无限个流中包含的不同值(0 1)。
当然,这在语义上可能不再正确,因为您确实希望从一组数据中获得前10个不同的值(您刚好“忘记”了数据是无限的)。 没有人真正想要10个随机值,然后才将它们减小到与众不同。
如果您来自SQL背景,则可能不会期望有这种差异。 以SQL Server 2012为例。 以下两个SQL语句相同:
-- Using TOP
SELECT DISTINCT TOP 10 *
FROM i
ORDER BY ..-- Using FETCH
SELECT *
FROM i
ORDER BY ..
OFFSET 0 ROWS
FETCH NEXT 10 ROWS ONLY
因此,作为SQL专家,您可能没有意识到流操作顺序的重要性。
6.再次混合操作顺序
说到SQL,如果您是MySQL或PostgreSQL人,则可能会习惯LIMIT .. OFFSET
子句。 SQL充满了微妙的怪癖,这就是其中之一。 该OFFSET
子句应用首先 ,在SQL Server 2012中的(即建议的SQL:2008标准的)语法。
如果将MySQL / PostgreSQL方言直接转换为流,则可能会出错:
IntStream.iterate(0, i -> i + 1).limit(10) // LIMIT.skip(5) // OFFSET.forEach(System.out::println);
以上收益
5
6
7
8
9
是。 它不会在9
之后继续,因为现在先应用limit()
,生成(0 1 2 3 4 5 6 7 8 9)。 之后应用skip()
,将流减少到(5 6 7 8 9)。 不是您可能想要的。
注意LIMIT .. OFFSET
与"OFFSET .. LIMIT"
陷阱!
7.使用过滤器遍历文件系统
以前我们已经在博客上写过 。 似乎一个好主意是使用过滤器遍历文件系统:
Files.walk(Paths.get(".")).filter(p -> !p.toFile().getName().startsWith(".")).forEach(System.out::println);
上面的流似乎仅在非隐藏目录(即不以点开头的目录)中移动。 不幸的是,您再次犯了#5和#6错误。 walk()
已经生成了当前目录的整个子目录流。 虽然懒惰,但逻辑上包含所有子路径。 现在,过滤器将正确过滤出名称以点“。”开头的路径。 例如.git
或.idea
将不属于结果流。 但是这些路径将是: .\.git\refs
或.\.idea\libraries
。 不是你想要的。
现在,不要通过编写以下内容解决此问题:
Files.walk(Paths.get(".")).filter(p -> !p.toString().contains(File.separator + ".")).forEach(System.out::println);
尽管这将产生正确的输出,但仍将通过遍历完整的目录子树,然后递归到“隐藏”目录的所有子目录来实现。
我猜您将不得不再次使用旧的JDK 1.0 File.list()
。 好消息是, FilenameFilter
和FileFilter
都是功能接口。
8.修改流的后备集合
在迭代List
,一定不要在迭代主体中修改相同的列表。 在Java 8之前确实如此,但是对于Java 8流,它可能变得更加棘手。 考虑以下来自0..9的列表:
// Of course, we create this list using streams:
List<Integer> list =
IntStream.range(0, 10).boxed().collect(toCollection(ArrayList::new));
现在,假设我们要在使用每个元素时将其删除:
list.stream()// remove(Object), not remove(int)!.peek(list::remove).forEach(System.out::println);
有趣的是,这将适用于某些元素! 您可能获得的输出是以下内容:
0
2
4
6
8
null
null
null
null
null
java.util.ConcurrentModificationException
如果我们在捕获到该异常之后对列表进行了自省,那么将会发现一个有趣的发现。 我们会得到:
[1, 3, 5, 7, 9]
嘿,它对所有奇数都有效。 这是一个错误吗? 不,它看起来像个功能。 如果您正在研究JDK代码,则可以在ArrayList.ArraListSpliterator
找到以下注释:
/** If ArrayLists were immutable, or structurally immutable (no* adds, removes, etc), we could implement their spliterators* with Arrays.spliterator. Instead we detect as much* interference during traversal as practical without* sacrificing much performance. We rely primarily on* modCounts. These are not guaranteed to detect concurrency* violations, and are sometimes overly conservative about* within-thread interference, but detect enough problems to* be worthwhile in practice. To carry this out, we (1) lazily* initialize fence and expectedModCount until the latest* point that we need to commit to the state we are checking* against; thus improving precision. (This doesn't apply to* SubLists, that create spliterators with current non-lazy* values). (2) We perform only a single* ConcurrentModificationException check at the end of forEach* (the most performance-sensitive method). When using forEach* (as opposed to iterators), we can normally only detect* interference after actions, not before. Further* CME-triggering checks apply to all other possible* violations of assumptions for example null or too-small* elementData array given its size(), that could only have* occurred due to interference. This allows the inner loop* of forEach to run without any further checks, and* simplifies lambda-resolution. While this does entail a* number of checks, note that in the common case of* list.stream().forEach(a), no checks or other computation* occur anywhere other than inside forEach itself. The other* less-often-used methods cannot take advantage of most of* these streamlinings.*/
现在,检查当我们告诉流产生sorted()
结果时会发生什么:
list.stream().sorted().peek(list::remove).forEach(System.out::println);
现在将产生以下“预期”输出
0
1
2
3
4
5
6
7
8
9
和流消费后的清单? 它是空的:
[]
因此,所有元素都将被消耗并正确删除。 sorted()
操作是“有状态中间操作” ,这意味着后续操作不再对后备集合进行操作,而是对内部状态进行操作。 现在从列表中删除元素是“安全的”!
好吧,我们真的可以吗? 让我们继续进行parallel()
, sorted()
移除:
list.stream().sorted().parallel().peek(list::remove).forEach(System.out::println);
现在产生:
7
6
2
5
8
4
1
0
9
3
并且列表包含
[8]
真是的 我们没有删除所有元素! 解决此流难题的任何人都可以免费获得啤酒( 和jOOQ贴纸 )!
这一切看起来都是相当随机和微妙的,我们只能建议您在使用流时不要真正修改后备集合。 就是行不通。
9.忘记实际消耗流
您认为以下信息流有什么作用?
IntStream.range(1, 5).peek(System.out::println).peek(i -> { if (i == 5) throw new RuntimeException("bang");});
阅读此书时,您可能会认为它将打印(1 2 3 4 5),然后引发异常。 但这是不正确的。 它什么也不会做。 流只是坐在那里,从未被消耗过。
与任何流畅的API或DSL一样,您实际上可能会忘记调用“终端”操作。 当您使用peek()
时尤其如此,因为peek()
与forEach()
非常相似。
当您忘记调用execute()
或fetch()
时, jOOQ可能会发生相同的情况:
DSL.using(configuration).update(TABLE).set(TABLE.COL1, 1).set(TABLE.COL2, "abc").where(TABLE.ID.eq(3));
哎呀。 没有execute()
是的,“最佳”方法-1-2次警告!
10.并行流死锁
现在这才是真正的礼物!
如果您未正确同步所有事物,则所有并发系统都可能陷入死锁。 虽然找不到现实的例子很明显,但找到强制的例子很明显。 保证下面的parallel()
流会陷入死锁:
Object[] locks = { new Object(), new Object() };IntStream.range(1, 5).parallel().peek(Unchecked.intConsumer(i -> {synchronized (locks[i % locks.length]) {Thread.sleep(100);synchronized (locks[(i + 1) % locks.length]) {Thread.sleep(50);}}})).forEach(System.out::println);
请注意Unchecked.intConsumer()
的使用,该函数将功能性IntConsumer
接口转换为org.jooq.lambda.fi.util.function.CheckedIntConsumer
,允许抛出已检查的异常。
好。 您的机器运气不好。 这些线程将永远被阻塞!
好消息是,用Java编写死锁的教科书示例从未如此简单!
有关更多详细信息,另请参见Brian Goetz对Stack Overflow的此问题的回答 。
结论
借助流和功能性思维,我们将遇到大量新的,细微的错误。 这些错误很少可以预防,除非通过实践和保持专注。 您必须考虑如何订购您的手术。 您必须考虑流是否可能是无限的。
流(和lambda)是一个非常强大的工具。 但是首先需要掌握的工具。
翻译自: https://www.javacodegeeks.com/2014/06/java-8-friday-10-subtle-mistakes-when-using-the-streams-api.html
java8 streams