英文:
Incorrect Protocol Buffer Deserialization/Marshaling In Spring AMQP
问题
我正在尝试弄清楚为什么在我的Java应用程序中,当从Spring AMQP监听器接收到协议缓冲区有效负载时,协议缓冲区会被错误地反序列化。
协议缓冲区是在Go服务中编写的,并发送到一个Rabbit MQ实例,然后由Java Spring应用程序接收并反序列化为对象实例。
奇怪的是,当接收到消息时,序列化的对象的字段2
的值是字段1
的值,而结果对象中字段1
的值为null
。
此外,重复的字段network_interfaces
的键5
在对象中为null
。
有趣的是,如果我在从队列中接收消息时使用调试器,那么缺失的字段是可见的,并且在unknownFields
下的实例化protobuf对象中包含正确的数据,所以看起来像是proto解析器工作不正确。
调试器显示字段键有误,即字段1
的键为2
,依此类推。
到目前为止,我已经:
- 验证了两个服务中的proto文件是相同的,Spring应用程序使用了另一个仓库中的proto文件,通过将其作为git子模块导入。
- 尝试使用
Spring.messaging.amqp
的MessageConverter根据消息头显式反序列化proto,但没有成功。 - 在发送消息时尝试了不同的消息内容类型和编码头,但似乎没有任何区别,因为我只使用消息正文。
- 验证了如果Go应用程序监听队列,它可以正确地反序列化和解组自己发送的消息,所以这必须是一个Spring配置问题或类似的问题。
涉及的Proto3消息定义:
message HostStateMessage {
message NetworkInterface {
string name = 1;
string mac_address = 2;
string ipv4 = 3;
string ipv6 = 4;
}
string hostname = 1;
string description = 2;
HostType host_type = 3;
repeated NetworkInterface network_interfaces = 4;
}
将协议缓冲区作为消息写入RabbitMQ的Go函数:
func PublishHostMessage(config *AMQPQueueConfiguration, conn *amqp.Connection, message *protos.HostStateMessage) error {
channel, err := conn.Channel()
if err != nil {
fmt.Errorf("Creating channel failed: %s\n", err)
return err
}
queue, err := channel.QueueDeclare(config.HostQueueName, true, false, false, false, nil)
if err != nil {
fmt.Errorf("Failed to declare queue: %s\n", err)
return err
}
serialMsg, err := proto.Marshal(message)
if err != nil {
fmt.Errorf("Failed to serialize proto message: %s\n", err)
return err
}
headers := amqp.Table{}
headers["messageType"] = "HostStateMessage"
fmt.Println(string(serialMsg))
err = channel.Publish("", queue.Name, false, false, amqp.Publishing{Headers: headers, ContentType: "application/x-protobuf", Body: serialMsg})
if err != nil {
fmt.Errorf("Failed to send message: %s\n", err)
return err
}
fmt.Printf("Sent host message at: %s\n", time.Now())
return nil
}
MessageReceiver.java类:
@Service
public class MessageReceiver {
@Autowired RabbitConfiguration rabbitConfiguration;
@Autowired HostDao hostDao;
@RabbitListener(queues = "#{rabbitConfiguration.hostQueueName}")
public void consumeHostNotification(Message in) {
try {
HostStateMessage message = HostStateMessage.parseFrom(in.getBody());
hostDao.addOrUpdateHostStateFromMessage(message);
} catch (InvalidProtocolBufferException e) {
System.out.println(e.getStackTrace());
}
}
}
Spring AMQP的@Configuration bean:
@Configuration
@EnableAutoConfiguration
public class RabbitConfiguration {
@Value("${rabbitmq.hostname}")
private String queueHost;
@Value("${rabbitmq.port}")
private int queuePort;
@Value("${rabbitmq.queues.host-queue-name}")
public String hostQueueName;
@Bean
public CachingConnectionFactory connectionFactory() {
return new CachingConnectionFactory(queueHost, queuePort);
}
@Bean
public Queue hostNotificationQueue() {
return new Queue(hostQueueName);
}
}
Go使用的是Go模块中的google.golang.org/protobuf v1.27.1
。
Java使用的是Maven依赖项com.google.protobuf 3.19.4
,以及protobuf-maven-plugin 0.6.1
用于使用protoc进行编译。
这是一个非常令人困惑的问题,希望能得到一些见解。
英文:
I am trying to figure out why my protocol buffer payload is being incorrectly de-serialized when received from a Spring AMQP listener in my Java app.
The protocol buffer is written in a Go service and sent to a Rabbit MQ instance which is then picked up by the Java Spring app and de-serialized into an object instance.
For some reason when received the marshalled object has field 2
with the value of field 1
while field 1
is null
in the resulting object.
Additionally the repeated network_interfaces
field with key 5
is null
on the object.
Interestingly if I use the debugger when picking up a message from the queue, the missing fields are visible and contain the correct data on the instantiated protobuf object under unknownFields
so it looks like a case of the proto parser not working correctly.
The debugger shows that the field keys are off, i.e. field 1
has the key 2
and so on.
So far I have:
- Verified the proto files are the same in both services, the Spring app is using the proto file from the other repo by importing it as a git submodule
- Attempted to use a
Spring.messaging.amqp
MessageConverter to deserialize the proto explicitly based on message headers with no success - Tried different message content type and encoding headers when sending the message, these don't appear to make a difference as I am only using the message body.
- Verified that if the Go app listens to the queue that it can deserialize and unmarshal a message it sent correctly, so this has to be a Spring config issue or similar.
The Proto3 message definition in question:
message HostStateMessage {
message NetworkInterface {
string name = 1;
string mac_address = 2;
string ipv4 = 3;
string ipv6 = 4;
}
string hostname = 1;
string description = 2;
HostType host_type = 3;
repeated NetworkInterface network_interfaces = 4;
}
The Go function which writes the protocol buffer to RabbitMQ as a message:
func PublishHostMessage(config *AMQPQueueConfiguration, conn *amqp.Connection, message *protos.HostStateMessage) error {
channel, err := conn.Channel()
if err != nil {
fmt.Errorf("Creating channel failed: %s\n", err)
return err
}
queue, err := channel.QueueDeclare(config.HostQueueName, true, false, false, false, nil)
if err != nil {
fmt.Errorf("Failed to declare queue: %s\n", err)
return err
}
serialMsg, err := proto.Marshal(message)
if err != nil {
fmt.Errorf("Failed to serialize proto message: %s\n", err)
return err
}
headers := amqp.Table{}
headers["messageType"] = "HostStateMessage"
fmt.Println(string(serialMsg))
err = channel.Publish("", queue.Name, false, false, amqp.Publishing{Headers: headers, ContentType: "application/x-protobuf", Body: serialMsg})
if err != nil {
fmt.Errorf("Failed to send message: %s\n", err)
return err
}
fmt.Printf("Sent host message at: %s\n", time.Now())
return nil
}
The MessageReceiver.java Class:
@Service
public class MessageReceiver {
@Autowired RabbitConfiguration rabbitConfiguration;
@Autowired HostDao hostDao;
@RabbitListener(queues = "#{rabbitConfiguration.hostQueueName}")
public void consumeHostNotification(Message in) {
try {
HostStateMessage message = HostStateMessage.parseFrom(in.getBody());
hostDao.addOrUpdateHostStateFromMessage(message);
} catch (InvalidProtocolBufferException e) {
System.out.println(e.getStackTrace());
}
}
}
The Spring AMQP @Configuration bean:
@Configuration
@EnableAutoConfiguration
public class RabbitConfiguration {
@Value("${rabbitmq.hostname}")
private String queueHost;
@Value("${rabbitmq.port}")
private int queuePort;
@Value("${rabbitmq.queues.host-queue-name}")
public String hostQueueName;
@Bean
public CachingConnectionFactory connectionFactory() {
return new CachingConnectionFactory(queueHost, queuePort);
}
@Bean
public Queue hostNotificationQueue() {
return new Queue(hostQueueName);
}
}
Go is using google.golang.org/protobuf v1.27.1
in Go modules.
Java is using com.google.protobuf 3.19.4
as a Maven dependency. Along with protobuf-maven-plugin 0.6.1
to do the compilation with protoc.
This is a really confusing issue, would be great to get some insight.
答案1
得分: -1
在多次检查确认Java生成的protobuf类正确之后,我意识到我没有验证Go生成的protobuf。
原来,我用于调用protoc-gen-go的脚本静默失败了,导致Go服务使用了一个带有额外字段键为1
的过时消息版本。
英文:
After checking the that the Java generated protobuf class was correct multiple I times, I realised that I had not verified the Go generated protobuf.
Turns out the script I was using to call protoc-gen-go had failed silently and the Go service was using an outdated version of the message with an additional field with the key 1
.
专注分享java语言的经验与见解,让所有开发者获益!
评论