在Spring AMQP中,协议缓冲区(Protocol Buffer)的反序列化/编组可能存在问题。

huangapple 未分类评论81阅读模式
英文:

Incorrect Protocol Buffer Deserialization/Marshaling In Spring AMQP

问题

我正在尝试弄清楚为什么在我的Java应用程序中,当从Spring AMQP监听器接收到协议缓冲区有效负载时,协议缓冲区会被错误地反序列化。

协议缓冲区是在Go服务中编写的,并发送到一个Rabbit MQ实例,然后由Java Spring应用程序接收并反序列化为对象实例。

奇怪的是,当接收到消息时,序列化的对象的字段2的值是字段1的值,而结果对象中字段1的值为null
此外,重复的字段network_interfaces的键5在对象中为null

有趣的是,如果我在从队列中接收消息时使用调试器,那么缺失的字段是可见的,并且在unknownFields下的实例化protobuf对象中包含正确的数据,所以看起来像是proto解析器工作不正确。
调试器显示字段键有误,即字段1的键为2,依此类推。

到目前为止,我已经:

  • 验证了两个服务中的proto文件是相同的,Spring应用程序使用了另一个仓库中的proto文件,通过将其作为git子模块导入。
  • 尝试使用Spring.messaging.amqp的MessageConverter根据消息头显式反序列化proto,但没有成功。
  • 在发送消息时尝试了不同的消息内容类型和编码头,但似乎没有任何区别,因为我只使用消息正文。
  • 验证了如果Go应用程序监听队列,它可以正确地反序列化和解组自己发送的消息,所以这必须是一个Spring配置问题或类似的问题。

涉及的Proto3消息定义:

message HostStateMessage {

  message NetworkInterface {
    string name = 1;
    string mac_address = 2;
    string ipv4 = 3;
    string ipv6 = 4;
  }

  string hostname = 1;
  string description = 2;
  HostType host_type = 3;
  repeated NetworkInterface network_interfaces = 4;
}

将协议缓冲区作为消息写入RabbitMQ的Go函数:

func PublishHostMessage(config *AMQPQueueConfiguration, conn *amqp.Connection, message *protos.HostStateMessage) error {
	channel, err := conn.Channel()
	if err != nil {
		fmt.Errorf("Creating channel failed: %s\n", err)
		return err
	}

	queue, err := channel.QueueDeclare(config.HostQueueName, true, false, false, false, nil)
	if err != nil {
		fmt.Errorf("Failed to declare queue: %s\n", err)
		return err
	}

	serialMsg, err := proto.Marshal(message)
	if err != nil {
		fmt.Errorf("Failed to serialize proto message: %s\n", err)
		return err
	}
	headers := amqp.Table{}
	headers["messageType"] = "HostStateMessage"
	fmt.Println(string(serialMsg))
	err = channel.Publish("", queue.Name, false, false, amqp.Publishing{Headers: headers, ContentType: "application/x-protobuf", Body: serialMsg})
	if err != nil {
		fmt.Errorf("Failed to send message: %s\n", err)
		return err
	}

	fmt.Printf("Sent host message at: %s\n", time.Now())
	return nil
}

MessageReceiver.java类:

@Service
public class MessageReceiver {

  @Autowired RabbitConfiguration rabbitConfiguration;
  @Autowired HostDao hostDao;

  @RabbitListener(queues = "#{rabbitConfiguration.hostQueueName}")
  public void consumeHostNotification(Message in) {
    try {
      HostStateMessage message = HostStateMessage.parseFrom(in.getBody());
      hostDao.addOrUpdateHostStateFromMessage(message);
    } catch (InvalidProtocolBufferException e) {
      System.out.println(e.getStackTrace());
    }
  }
}

Spring AMQP的@Configuration bean:

@Configuration
@EnableAutoConfiguration
public class RabbitConfiguration {

  @Value("${rabbitmq.hostname}")
  private String queueHost;

  @Value("${rabbitmq.port}")
  private int queuePort;

  @Value("${rabbitmq.queues.host-queue-name}")
  public String hostQueueName;

  @Bean
  public CachingConnectionFactory connectionFactory() {
    return new CachingConnectionFactory(queueHost, queuePort);
  }

  @Bean
  public Queue hostNotificationQueue() {
    return new Queue(hostQueueName);
  }
}

Go使用的是Go模块中的google.golang.org/protobuf v1.27.1

Java使用的是Maven依赖项com.google.protobuf 3.19.4,以及protobuf-maven-plugin 0.6.1用于使用protoc进行编译。

这是一个非常令人困惑的问题,希望能得到一些见解。

英文:

I am trying to figure out why my protocol buffer payload is being incorrectly de-serialized when received from a Spring AMQP listener in my Java app.
The protocol buffer is written in a Go service and sent to a Rabbit MQ instance which is then picked up by the Java Spring app and de-serialized into an object instance.

For some reason when received the marshalled object has field 2 with the value of field 1 while field 1 is null in the resulting object.
Additionally the repeated network_interfaces field with key 5 is null on the object.

Interestingly if I use the debugger when picking up a message from the queue, the missing fields are visible and contain the correct data on the instantiated protobuf object under unknownFields so it looks like a case of the proto parser not working correctly.
The debugger shows that the field keys are off, i.e. field 1 has the key 2 and so on.

So far I have:

  • Verified the proto files are the same in both services, the Spring app is using the proto file from the other repo by importing it as a git submodule
  • Attempted to use a Spring.messaging.amqp MessageConverter to deserialize the proto explicitly based on message headers with no success
  • Tried different message content type and encoding headers when sending the message, these don't appear to make a difference as I am only using the message body.
  • Verified that if the Go app listens to the queue that it can deserialize and unmarshal a message it sent correctly, so this has to be a Spring config issue or similar.

The Proto3 message definition in question:

message HostStateMessage {

  message NetworkInterface {
    string name = 1;
    string mac_address = 2;
    string ipv4 = 3;
    string ipv6 = 4;
  }

  string hostname = 1;
  string description = 2;
  HostType host_type = 3;
  repeated NetworkInterface network_interfaces = 4;
}

The Go function which writes the protocol buffer to RabbitMQ as a message:

func PublishHostMessage(config *AMQPQueueConfiguration, conn *amqp.Connection, message *protos.HostStateMessage) error {
	channel, err := conn.Channel()
	if err != nil {
		fmt.Errorf("Creating channel failed: %s\n", err)
		return err
	}

	queue, err := channel.QueueDeclare(config.HostQueueName, true, false, false, false, nil)
	if err != nil {
		fmt.Errorf("Failed to declare queue: %s\n", err)
		return err
	}

	serialMsg, err := proto.Marshal(message)
	if err != nil {
		fmt.Errorf("Failed to serialize proto message: %s\n", err)
		return err
	}
	headers := amqp.Table{}
	headers["messageType"] = "HostStateMessage"
	fmt.Println(string(serialMsg))
	err = channel.Publish("", queue.Name, false, false, amqp.Publishing{Headers: headers, ContentType: "application/x-protobuf", Body: serialMsg})
	if err != nil {
		fmt.Errorf("Failed to send message: %s\n", err)
		return err
	}

	fmt.Printf("Sent host message at: %s\n", time.Now())
	return nil
}

The MessageReceiver.java Class:

@Service
public class MessageReceiver {

  @Autowired RabbitConfiguration rabbitConfiguration;
  @Autowired HostDao hostDao;

  @RabbitListener(queues = "#{rabbitConfiguration.hostQueueName}")
  public void consumeHostNotification(Message in) {
    try {
      HostStateMessage message = HostStateMessage.parseFrom(in.getBody());
      hostDao.addOrUpdateHostStateFromMessage(message);
    } catch (InvalidProtocolBufferException e) {
      System.out.println(e.getStackTrace());
    }
  }
}

The Spring AMQP @Configuration bean:

@Configuration
@EnableAutoConfiguration
public class RabbitConfiguration {

  @Value("${rabbitmq.hostname}")
  private String queueHost;

  @Value("${rabbitmq.port}")
  private int queuePort;

  @Value("${rabbitmq.queues.host-queue-name}")
  public String hostQueueName;

  @Bean
  public CachingConnectionFactory connectionFactory() {
    return new CachingConnectionFactory(queueHost, queuePort);
  }

  @Bean
  public Queue hostNotificationQueue() {
    return new Queue(hostQueueName);
  }
}

Go is using google.golang.org/protobuf v1.27.1 in Go modules.

Java is using com.google.protobuf 3.19.4 as a Maven dependency. Along with protobuf-maven-plugin 0.6.1 to do the compilation with protoc.

This is a really confusing issue, would be great to get some insight.

答案1

得分: -1

在多次检查确认Java生成的protobuf类正确之后,我意识到我没有验证Go生成的protobuf。

原来,我用于调用protoc-gen-go的脚本静默失败了,导致Go服务使用了一个带有额外字段键为1的过时消息版本。

英文:

After checking the that the Java generated protobuf class was correct multiple I times, I realised that I had not verified the Go generated protobuf.

Turns out the script I was using to call protoc-gen-go had failed silently and the Go service was using an outdated version of the message with an additional field with the key 1.

huangapple
  • 本文由 发表于 2022年3月1日 21:23:44
  • 转载请务必保留本文链接:https://java.coder-hub.com/71309150.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定