英文:
Choose between UnpooledByteBufAllocator and Pooled in netty when merge many ByteBufs
问题
我使用 AsyncHttpClient 库来进行文件下载。
例如,我需要从一个主机下载 500 个文件,每个文件大小约为 10MB,并将这些文件写入磁盘。对于这个示例,我将提供以下代码:
- 声明 AsyncHttpClient。我使用 ResponseBodyPartFactory.LAZY 来避免复制到 Java 堆中
final AsyncHttpClient asyncHttpClient = Dsl.asyncHttpClient(Dsl.config()
.setFollowRedirect(true)
.setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY)
.setUseOpenSsl(true)
.setRequestTimeout(6000000)
.setUseNativeTransport(true)
.build());
- 声明 ExecutorService,将用于写入文件。使用固定大小,因为默认的执行器服务可能会创建 1000 多个线程(它使用 cachedThreadPool)
final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
- 文件下载的代码。我使用 localhost,因为我想获取最大的下载速度,然后将这些文件写回磁盘。主要部分在方法 onBodyPartReceived 中:
for (int i = 0; i < downloadCount; i++) {
asyncHttpClient.prepareGet("http://localhost/Pizigani_1367_Chart_10MB.jpg").execute(new AsyncHandler<Void>() {
// ... 其他回调方法
@Override
public State onBodyPartReceived(HttpResponseBodyPart content) {
// ... 其他处理
if (compositeByteBuf.numComponents() == compositeByteBuf.maxNumComponents() || content.isLast()) {
final ByteBuf mergedDirectBuffer = UnpooledByteBufAllocator.DEFAULT.directBuffer(compositeByteBuf.readableBytes()); //or Pooled?
for (ByteBuffer b : compositeByteBuf.nioBuffers()) {
mergedDirectBuffer.writeBytes(b);
}
compositeByteBuf.release();
ByteBuffer asyncBuffer = mergedDirectBuffer.nioBuffer().rewind();
long currentFilePosition = this.currentFilePosition;
this.currentFilePosition += mergedDirectBuffer.readableBytes();
this.compositeByteBuf = Unpooled.compositeBuffer(16);
fileChannel.write(asyncBuffer, currentFilePosition, mergedDirectBuffer, new CompletionHandler<>() {
// ... 写入完成和失败的处理
});
}
return State.CONTINUE;
}
});
}
我会总结一些要点:
-
保留来自响应的 netty ByteBuf。
final ByteBuf responseByteBuf = ((LazyResponseBodyPart) content).getBuf().retain();
-
将此缓冲区添加到组合字节缓冲区中,这样可以避免频繁地向磁盘写入,所以我保存了 16 个缓冲区,然后一次性将它们写入磁盘。
final CompositeByteBuf compositeByteBuf = this.compositeByteBuf.addComponent(true, responseByteBuf);
-
当组合缓冲区的大小达到 16 或者内容已经结束时,我会分配一个新的直接缓冲区,使用 netty 的 PoolingByteBufAllocator,大小是这 16 个缓冲区的总和,并将字节写入其中。然后我会释放 netty 的 16 个缓冲区,并将我的直接缓冲区写入文件,当文件写入完成后,释放直接缓冲区。
if (compositeByteBuf.numComponents() == compositeByteBuf.maxNumComponents() || content.isLast()) { final ByteBuf mergedDirectBuffer = UnpooledByteBufAllocator.DEFAULT.directBuffer(compositeByteBuf.readableBytes()); //or Pooled? for (ByteBuffer b : compositeByteBuf.nioBuffers()) { mergedDirectBuffer.writeBytes(b); } compositeByteBuf.release(); ByteBuffer asyncBuffer = mergedDirectBuffer.nioBuffer().rewind(); long currentFilePosition = this.currentFilePosition; this.currentFilePosition += mergedDirectBuffer.readableBytes(); this.compositeByteBuf = Unpooled.compositeBuffer(16); fileChannel.write(asyncBuffer, currentFilePosition, mergedDirectBuffer, new CompletionHandler<>() { // ... 写入完成和失败的处理 }); }
所以问题是,当将缓冲区合并为新的直接缓冲区时,我应该选择使用 Pooled 还是 Unpooled?只是因为我的一些测量结果显示 Unpooled 更快,而 Pooled 经常不能释放内存。
并且写入合并缓冲区到磁盘的方法是否正确?也许有更好的方法吗?我不太了解 netty。
英文:
I use AsyncHttpClient library for files download.
For example, I need to download 500 files from one host, each size about 10 MB, and write these files to disk. For this example, I will provide the below code:
-
Declare AsyncHttpClient. I use ResponseBodyPartFactory.LAZY to avoid copying into java heap
final AsyncHttpClient asyncHttpClient = Dsl.asyncHttpClient(Dsl.config() .setFollowRedirect(true) .setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY) .setUseOpenSsl(true) .setRequestTimeout(6000000) .setUseNativeTransport(true) .build());
-
Declare ExecutorService, which will be used to write files. Use fixed size because default executor service can create 1000+ threads (he use cachedThreadPool)
final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2)
-
Code for files download. I used localhost because I want to get the maximum download speed and then write those files back to disk. The main part is in the method onBodyPartReceived
for (int i = 0; i < downloadCount; i++) { asyncHttpClient.prepareGet("http://localhost/Pizigani_1367_Chart_10MB.jpg").execute(new AsyncHandler<Void>() { private long currentFilePosition; private AsynchronousFileChannel fileChannel; private CompositeByteBuf compositeByteBuf; @Override public AsyncHandler.State onStatusReceived(HttpResponseStatus status) throws IOException { if (status.getStatusCode() != 200) { return State.ABORT; } this.fileChannel = AsynchronousFileChannel.open(Paths.get("/tmp/" + UUID.randomUUID().toString() + "file.jpg"), EnumSet.of(StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE), executorService); this.compositeByteBuf = Unpooled.compositeBuffer(16); return State.CONTINUE; } @Override public State onHeadersReceived(HttpHeaders headers) { return State.CONTINUE; } @Override public void onThrowable(Throwable t) { } @Override public Void onCompleted() { return null; } @Override public State onBodyPartReceived(HttpResponseBodyPart content) { final ByteBuf responseByteBuf = ((LazyResponseBodyPart) content).getBuf().retain(); final CompositeByteBuf compositeByteBuf = this.compositeByteBuf.addComponent(true, responseByteBuf); if (compositeByteBuf.numComponents() == compositeByteBuf.maxNumComponents() || content.isLast()) { final ByteBuf mergedDirectBuffer = UnpooledByteBufAllocator.DEFAULT.directBuffer(compositeByteBuf.readableBytes()); //or Pooled? for (ByteBuffer b : compositeByteBuf.nioBuffers()) { mergedDirectBuffer.writeBytes(b); } compositeByteBuf.release(); ByteBuffer asyncBuffer = mergedDirectBuffer.nioBuffer().rewind(); long currentFilePosition= this.currentFilePosition; this.currentFilePosition += mergedDirectBuffer.readableBytes(); this.compositeByteBuf = Unpooled.compositeBuffer(16); fileChannel.write(asyncBuffer, currentFilePosition, mergedDirectBuffer, new CompletionHandler<>() { @Override public void completed(Integer result, ByteBuf attachment) { attachment.release(); if (content.isLast()) { try { fileChannel.close(); } catch (IOException e) { throw new UncheckedIOException(e); } } } @Override public void failed(Throwable exc, ByteBuf attachment) { attachment.release(); try { fileChannel.close(); } catch (IOException e) { throw new UncheckedIOException(e); } } }); } return State.CONTINUE; } });
}
I'll write down some points:
- Retain netty byteBuf from response.
final ByteBuf responseByteBuf = ((LazyResponseBodyPart) content).getBuf().retain()
;
- Add this buffer to composite byte buf, It was done so that there were not many write operations to the disk, so I save up 16 buffers and only then write them to disk at once.
final CompositeByteBuf compositeByteBuf = this.compositeByteBuf.addComponent(true, responseByteBuf);
-
When size of composite buf is 16 or content is last, i allocate new direct buffer, using netty PoolingByteBufAllocator, which size is sum of all 16 buffers and write bytes into this one. After i release netty 16 bufs and write my direct buffer to file, when file written, then release my direct buffer.
if (compositeByteBuf.numComponents() == compositeByteBuf.maxNumComponents() || content.isLast()) { final ByteBuf mergedDirectBuffer = UnpooledByteBufAllocator.DEFAULT.directBuffer(compositeByteBuf.readableBytes()); //or Pooled? for (ByteBuffer b : compositeByteBuf.nioBuffers()) { mergedDirectBuffer.writeBytes(b); } compositeByteBuf.release(); ByteBuffer asyncBuffer = mergedDirectBuffer.nioBuffer().rewind(); long currentFilePosition= this.currentFilePosition; this.currentFilePosition += mergedDirectBuffer.readableBytes(); this.compositeByteBuf = Unpooled.compositeBuffer(16); fileChannel.write(asyncBuffer, currentFilePosition, mergedDirectBuffer, new CompletionHandler<>() {
So question.
Should i prefer Pooled or Unpooled when merging buffers into a new direct buffer? It's just that some of my measurements show that Unpooled is faster, and Pooled often does not free memory.
And is the approach with writing the combined buffer to disk correct? maybe there are better ways?
I don't know netty very well.
专注分享java语言的经验与见解,让所有开发者获益!
评论