英文:
From InputStream to parallel Stream<T>
问题
我获得一个包含多个元素的InputStream,它们通过一个Stream<T>以串行方式(与它们在“InputStream”中的顺序相同)进行扫描、解析和迭代,然后被持久化到数据库中。这个过程正常运行。
现在,我尝试以并行方式迭代Stream<T>,使用Stream<T>.parallel()
,这样当一个线程在持久化时,其他线程仍然可以扫描InputStream并进行持久化。
然后,我尝试使用Stream<T>.parallel()
来并行化生成的Stream<MyElement>
。为了验证并行化是否起作用,我在流中添加了一个映射函数来添加随机延迟。我原以为结果元素会以随机顺序打印出来。
但实际结果与预期不符。元素仍然按照文件顺序显示。
是否有一种方法可以正确地并行迭代这个流?
public class FromInputStreamToParallelStream {
public static Stream<MyElement> getStream(InputStream is) {
try (var scanner = new Scanner(is)) {
return scanner//
.useDelimiter("DELIMITER")
.tokens()
.parallel()
.map(MyElementParser::parse);
}
}
@Test
public void test() throws IOException {
try (InputStream in = Files.newInputStream(Paths.get("my-file.xml"));) {
getStream(in)
.map(FromInputStreamToParallelStream::sleepRandom)
.forEach(System.out::println);
}
}
private static MyElement sleepRandom(MyElement element) {
var randomNumber = new Random().nextInt(10);
System.out.println("wait. " + randomNumber);
try {
TimeUnit.SECONDS.sleep(randomNumber);
} catch (InterruptedException e) {
e.printStackTrace();
}
return element;
}
}
我猜我需要实现自己的Spliterator<T>
。预先感谢您。
英文:
I am getting an InputStream that contains multiple elements, they are scanned, parsed, iterated by a Stream<T> in a serial way (same order they had in the InputStream
), and then persisted in a DB. This works fine.
Now, I am trying to iterate the Stream<T> in parallel way, with Stream<T>.parallel()
, so while one thread is blocked persisting, other ones can still scanning the InputStream and persisting.
Then, I tried to parallelized the resulting Stream<MyElement>
with Stream<T>.parallel()
. To check that the parallelization works, I added into the stream a map function that add a random delay. I was expecting that the resulted elements were printed in a random order.
But the result is not the expected one. The elements are still shown in the file order.
Is there a way to properly iterate this stream in parallel?
public class FromInputStreamToParallelStream {
public static Stream<MyElement> getStream(InputStream is) {
try (var scanner = new Scanner(is)) {
return scanner//
.useDelimiter("DELIMITER")
.tokens()
.parallel()
.map(MyElementParser::parse);
}
}
@Test
public void test() throws IOException {
try (InputStream in = Files.newInputStream(Paths.get("my-file.xml"));) {
getStream(in)
.map(FromInputStreamToParallelStream::sleepRandom)
.forEach(System.out::println);
}
}
private static MyElement sleepRandom(MyElement element) {
var randomNumber = new Random().nextInt(10);
System.out.println("wait. " + randomNumber);
try {
TimeUnit.SECONDS.sleep(randomNumber);
} catch (InterruptedException e) {
e.printStackTrace();
}
return element;
}
}
I guess I gonna need to implement my own Spliterator<T>
.
Thanks in advance.
专注分享java语言的经验与见解,让所有开发者获益!
评论