尝试将Flux<DataBuffer>读入InputStream,使用管道输入/输出流会阻塞并超时。

huangapple 未分类评论46阅读模式
英文:

Trying to read a Flux<DataBuffer> into a InputStream using a Piped Input/Output stream blocks and times out

问题

我有一个流水线,用于流式传输各种类型的文件的 DataBuffers。最近我注意到某个源和目标的组合会完全阻塞,导致超时。我已经将问题缩小到一个 InputStream 源和一个目标,该目标执行来自 Apache Commons IO 的 InputStream.readAllBytes()IOUtils.toByteArray(InputStream)

@Test
public void testReadAllBytesFromInputStream() {
        Path p = testResources.resolve("file.txt");
        Flux<DataBuffer> buffer = DataBufferUtils.readInputStream(
                () -> new FileInputStream(p.toFile()), leakAwareDataBufferFactory, 512);
        byte[] bytes = getBytesfromFlux(buffer);
        assertTrue(bytes.length > 0);
}

public byte[] getBytesFromFile(Flux<DataBuffer> buffer) throws IOException {
        PipedOutputStream osPipe = new PipedOutputStream();
        PipedInputStream isPipe = new PipedInputStream(osPipe);
        DataBufferUtils.write(source, osPipe)
            .onErrorResume(throwable -> {
                try {
                    osPipe.close();
                } catch (IOException ioe) {
                    //nothing
                }
                return Flux.error(throwable);
            }).doOnComplete(() -> {
                try {
                    osPipe.close();
                } catch (IOException ioe) {
                    //nothing
                }
        }).subscribe(DataBufferUtils.releaseConsumer());
        return isPipe.readAllBytes();
}

简单地进行阻塞调用并将它们合并起来是行不通的,因为这会破坏正确流式传输的消费者的性能。当从 FileChannel 读取时,它不会阻塞。似乎是管道流与字节消费者之间的阻塞问题。

英文:

I have a pipeline that streams DataBuffers for various types of files. Recently I noticed a combination of source & target that completely blocks, causing a timeout. I have narrowed it down to a InputStream source, and a target that performs InputStream.readAllBytes() or IOUtils.toByteArray(InputStream) from Apache-Commons IO.

@Test
public void testReadAllBytesFromInputStream() {
        Path p = testResources.resolve(&quot;file.txt&quot;);
        Flux&lt;DataBuffer&gt; buffer = DataBufferUtils.readInputStream(
                () -&gt; new FileInputStream(p.toFile()), leakAwareDataBufferFactory, 512);
        byte[] bytes = getBytesfromFlux(buffer);
        assertTrue( bytes.length &gt; 0);
}

public byte[] getBytesFromFile(Flux&lt;DataBuffer&gt; buffer) throws IOException {
        PipedOutputStream osPipe = new PipedOutputStream();
        PipedInputStream isPipe = new PipedInputStream(osPipe);
        DataBufferUtils.write(source, osPipe)
            .onErrorResume(throwable -&gt; {
                try {
                    osPipe.close();
                } catch (IOException ioe) {
                    //nothing
                }
                return Flux.error(throwable);
            }).doOnComplete(() -&gt; {
                try {
                    osPipe.close();
                } catch (IOException ioe) {
                    //nothing
                }
        }).subscribe(DataBufferUtils.releaseConsumer());
        return isPipe.readAllBytes();
}

Simply doing a blocking call and merging them in will not work, as it would wreck the performance of consumers that stream correctly.
When reading from a FileChannel, it does not block. Seems to be a blocking issue between the piped streams and the byte consumer.

huangapple
  • 本文由 发表于 2020年5月5日 02:03:06
  • 转载请务必保留本文链接:https://java.coder-hub.com/61598713.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定