英文:
Netty server buffer read multiple messages at once
问题
1. Current problem
- 1个Netty客户端,1个Netty服务器
- 在Netty客户端中,3个不同的Kafka线程通过1个ChannelHandler向Netty服务器发送消息
- 在Netty客户端侧,似乎通道处理程序会立即发送每个线程的消息。
但在Netty服务器侧,似乎Netty会一次性读取从3个Kafka线程发送的多条消息。 - 我通过日志记录来验证了这一点:
客户端日志图像,服务器端日志图像
2. 代码
Netty客户端代码
public class ClientHandler extends ChannelInboundHandlerAdapter {
...
@Override
public void channelActive(ChannelHandlerContext ctx) {
log.debug("channelActive");
this.ctx = ctx;
}
public void sendMessage(String message) {
log.info("Sent message: {}", message);
ByteBuf messageBuffer = Unpooled.buffer();
messageBuffer.writeBytes(message.getBytes());
ctx.writeAndFlush(messageBuffer);
}
Netty服务器代码
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
...
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
NettyServer.ServerStatus.increaseAndGetTotalReadMsg();
ByteBuf byteBuf = (ByteBuf)msg;
log.info(byteBuf.toString(Charset.defaultCharset()));
byteBuf.release();
}
}
这里说writeAndFlush
是线程安全的,但看起来似乎不是。这是正常情况还是异常情况?
英文:
1. Current problem
- 1 Netty client, 1 Netty server
- In netty client, 3 different kafka threads send message to netty server by 1 ChannelHandler
- On netty client side, it seems like channel handler send each thread's message at once.
But on netty server side, it seems like netty read multiple messages sent from 3 kafka threads at once. - I checked this by logging :
Client side log image, Server side log image
2. Code
Netty client code
public class ClientHandler extends ChannelInboundHandlerAdapter {
...
@Override
public void channelActive(ChannelHandlerContext ctx) {
log.debug("channelActive");
this.ctx = ctx;
}
public void sendMessage(String message) {
log.info("Sent message: {}", message);
ByteBuf messageBuffer = Unpooled.buffer();
messageBuffer.writeBytes(message.getBytes());
ctx.writeAndFlush(messageBuffer);
}
Netty server code
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
...
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
NettyServer.ServerStatus.increaseAndGetTotalReadMsg();
ByteBuf byteBuf = (ByteBuf)msg;
log.info(byteBuf.toString(Charset.defaultCharset()));
byteBuf.release();
}
}
Here says that writeAndFlush
is thread-safe but it seem's doesn't. Is it normal case or not?
专注分享java语言的经验与见解,让所有开发者获益!
评论