从InputStream到并行Stream<T>

huangapple 未分类评论49阅读模式
英文:

From InputStream to parallel Stream<T>

问题

我获得一个包含多个元素的InputStream,它们通过一个Stream<T>以串行方式(与它们在“InputStream”中的顺序相同)进行扫描、解析和迭代,然后被持久化到数据库中。这个过程正常运行。

现在,我尝试以并行方式迭代Stream<T>,使用Stream&lt;T&gt;.parallel(),这样当一个线程在持久化时,其他线程仍然可以扫描InputStream并进行持久化。

然后,我尝试使用Stream&lt;T&gt;.parallel()来并行化生成的Stream&lt;MyElement&gt;。为了验证并行化是否起作用,我在流中添加了一个映射函数来添加随机延迟。我原以为结果元素会以随机顺序打印出来。

但实际结果与预期不符。元素仍然按照文件顺序显示。

是否有一种方法可以正确地并行迭代这个流?

public class FromInputStreamToParallelStream {    

    public static Stream&lt;MyElement&gt; getStream(InputStream is) {
        try (var scanner = new Scanner(is)) {
            return scanner//
                    .useDelimiter("DELIMITER")
                    .tokens()
                    .parallel() 
                    .map(MyElementParser::parse);
        }
    }

    @Test
    public void test() throws IOException  {
        try (InputStream in = Files.newInputStream(Paths.get("my-file.xml"));) {
            getStream(in)
                    .map(FromInputStreamToParallelStream::sleepRandom) 
                    .forEach(System.out::println);
        }
    }

    private static MyElement sleepRandom(MyElement element) {
        var randomNumber = new Random().nextInt(10);
        System.out.println("wait. " + randomNumber);

        try {
            TimeUnit.SECONDS.sleep(randomNumber);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return element;
    }
}

我猜我需要实现自己的Spliterator&lt;T&gt;。预先感谢您。

英文:

I am getting an InputStream that contains multiple elements, they are scanned, parsed, iterated by a Stream<T> in a serial way (same order they had in the InputStream), and then persisted in a DB. This works fine.

Now, I am trying to iterate the Stream<T> in parallel way, with Stream&lt;T&gt;.parallel(), so while one thread is blocked persisting, other ones can still scanning the InputStream and persisting.

Then, I tried to parallelized the resulting Stream&lt;MyElement&gt; with Stream&lt;T&gt;.parallel(). To check that the parallelization works, I added into the stream a map function that add a random delay. I was expecting that the resulted elements were printed in a random order.

But the result is not the expected one. The elements are still shown in the file order.

Is there a way to properly iterate this stream in parallel?

public class FromInputStreamToParallelStream {    

	public static Stream&lt;MyElement&gt; getStream(InputStream is) {
		try (var scanner = new Scanner(is)) {
			return scanner//
					.useDelimiter(&quot;DELIMITER&quot;)
					.tokens()
                    .parallel() 
					.map(MyElementParser::parse);
		}
	}

    @Test
	public void test() throws IOException  {
		try (InputStream in = Files.newInputStream(Paths.get(&quot;my-file.xml&quot;));) {
			getStream(in)
					.map(FromInputStreamToParallelStream::sleepRandom) 
					.forEach(System.out::println);
		}
	}
	
	private static MyElement sleepRandom(MyElement element) {
		var randomNumber = new Random().nextInt(10);
		System.out.println(&quot;wait. &quot; + randomNumber);

		try {
			TimeUnit.SECONDS.sleep(randomNumber);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		return element;
	}
}

I guess I gonna need to implement my own Spliterator&lt;T&gt;.

Thanks in advance.

huangapple
  • 本文由 发表于 2020年5月4日 02:12:27
  • 转载请务必保留本文链接:https://java.coder-hub.com/61579402.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定