英文:
Trying to read a Flux<DataBuffer> into a InputStream using a Piped Input/Output stream blocks and times out
问题
我有一个流水线,用于流式传输各种类型的文件的 DataBuffers。最近我注意到某个源和目标的组合会完全阻塞,导致超时。我已经将问题缩小到一个 InputStream 源和一个目标,该目标执行来自 Apache Commons IO 的 InputStream.readAllBytes()
或 IOUtils.toByteArray(InputStream)
。
@Test
public void testReadAllBytesFromInputStream() {
Path p = testResources.resolve("file.txt");
Flux<DataBuffer> buffer = DataBufferUtils.readInputStream(
() -> new FileInputStream(p.toFile()), leakAwareDataBufferFactory, 512);
byte[] bytes = getBytesfromFlux(buffer);
assertTrue(bytes.length > 0);
}
public byte[] getBytesFromFile(Flux<DataBuffer> buffer) throws IOException {
PipedOutputStream osPipe = new PipedOutputStream();
PipedInputStream isPipe = new PipedInputStream(osPipe);
DataBufferUtils.write(source, osPipe)
.onErrorResume(throwable -> {
try {
osPipe.close();
} catch (IOException ioe) {
//nothing
}
return Flux.error(throwable);
}).doOnComplete(() -> {
try {
osPipe.close();
} catch (IOException ioe) {
//nothing
}
}).subscribe(DataBufferUtils.releaseConsumer());
return isPipe.readAllBytes();
}
简单地进行阻塞调用并将它们合并起来是行不通的,因为这会破坏正确流式传输的消费者的性能。当从 FileChannel 读取时,它不会阻塞。似乎是管道流与字节消费者之间的阻塞问题。
英文:
I have a pipeline that streams DataBuffers for various types of files. Recently I noticed a combination of source & target that completely blocks, causing a timeout. I have narrowed it down to a InputStream source, and a target that performs InputStream.readAllBytes()
or IOUtils.toByteArray(InputStream)
from Apache-Commons IO.
@Test
public void testReadAllBytesFromInputStream() {
Path p = testResources.resolve("file.txt");
Flux<DataBuffer> buffer = DataBufferUtils.readInputStream(
() -> new FileInputStream(p.toFile()), leakAwareDataBufferFactory, 512);
byte[] bytes = getBytesfromFlux(buffer);
assertTrue( bytes.length > 0);
}
public byte[] getBytesFromFile(Flux<DataBuffer> buffer) throws IOException {
PipedOutputStream osPipe = new PipedOutputStream();
PipedInputStream isPipe = new PipedInputStream(osPipe);
DataBufferUtils.write(source, osPipe)
.onErrorResume(throwable -> {
try {
osPipe.close();
} catch (IOException ioe) {
//nothing
}
return Flux.error(throwable);
}).doOnComplete(() -> {
try {
osPipe.close();
} catch (IOException ioe) {
//nothing
}
}).subscribe(DataBufferUtils.releaseConsumer());
return isPipe.readAllBytes();
}
Simply doing a blocking call and merging them in will not work, as it would wreck the performance of consumers that stream correctly.
When reading from a FileChannel, it does not block. Seems to be a blocking issue between the piped streams and the byte consumer.
专注分享java语言的经验与见解,让所有开发者获益!
评论