Netty服务器一次性读取多个消息的缓冲区内容。

huangapple 未分类评论44阅读模式
英文:

Netty server buffer read multiple messages at once

问题

1. Current problem

  • 1个Netty客户端,1个Netty服务器
  • 在Netty客户端中,3个不同的Kafka线程通过1个ChannelHandler向Netty服务器发送消息
  • 在Netty客户端侧,似乎通道处理程序会立即发送每个线程的消息。
    但在Netty服务器侧,似乎Netty会一次性读取从3个Kafka线程发送的多条消息。
  • 我通过日志记录来验证了这一点:
    客户端日志图像服务器端日志图像

2. 代码

Netty客户端代码
public class ClientHandler extends ChannelInboundHandlerAdapter {

    ...

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        log.debug("channelActive");
        this.ctx = ctx;
    }

    public void sendMessage(String message) {
        log.info("Sent message: {}", message);
        ByteBuf messageBuffer = Unpooled.buffer();
        messageBuffer.writeBytes(message.getBytes());
        ctx.writeAndFlush(messageBuffer);
    }

Netty服务器代码
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    ...

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        NettyServer.ServerStatus.increaseAndGetTotalReadMsg();
        ByteBuf byteBuf = (ByteBuf)msg;
        log.info(byteBuf.toString(Charset.defaultCharset()));
        byteBuf.release();
    }
}

这里writeAndFlush是线程安全的,但看起来似乎不是。这是正常情况还是异常情况?

英文:

1. Current problem

  • 1 Netty client, 1 Netty server
  • In netty client, 3 different kafka threads send message to netty server by 1 ChannelHandler
  • On netty client side, it seems like channel handler send each thread's message at once.
    But on netty server side, it seems like netty read multiple messages sent from 3 kafka threads at once.
  • I checked this by logging :
    Client side log image, Server side log image

2. Code

Netty client code
public class ClientHandler extends ChannelInboundHandlerAdapter {

    ...

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        log.debug("channelActive");
        this.ctx = ctx;
    }

    public void sendMessage(String message) {
        log.info("Sent message: {}", message);
        ByteBuf messageBuffer = Unpooled.buffer();
        messageBuffer.writeBytes(message.getBytes());
        ctx.writeAndFlush(messageBuffer);
    }

Netty server code
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    ...

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        NettyServer.ServerStatus.increaseAndGetTotalReadMsg();
        ByteBuf byteBuf = (ByteBuf)msg;
        log.info(byteBuf.toString(Charset.defaultCharset()));
        byteBuf.release();
    }
}

Here says that writeAndFlush is thread-safe but it seem's doesn't. Is it normal case or not?

huangapple
  • 本文由 发表于 2020年4月3日 20:56:05
  • 转载请务必保留本文链接:https://java.coder-hub.com/61012419.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定