英文:
How to change unmarshaler in watermill (lib for Kafka) to avro
问题
我正在尝试调试为什么无法对通过Kafka接收并使用基于Avro的特定模式序列化的数据进行反序列化。
情况是这样的。
有一个用Java编写的生产者根据某个Avro模式对数据进行序列化,将该数据发送到Kafka主题,而在消费者端有一个用Golang编写的程序。
但是,每次我尝试对该数据进行反序列化时,都会收到错误消息,说无法反序列化,因为字符串的长度小于零(dependencies字段)。
模式如下:
{
"namespace": "data.avro",
"doc":"Docstring.",
"name": "Avrodata",
"type": "record",
"fields": [
{"name": "id", "type": "int"},
{"name": "theName", "type": "string"},
{"name": "dependencies", "type": {"type": "array", "items": "string"}}
]
}
我最近怀疑的是也许我需要将此Unmarshaler更改为avro,但我不知道如何做到这一点。
我使用了这个avro库https://github.com/hamba/avro和https://watermill.io/pubsubs/kafka/用于Kafka。
Unmarshaler: kafka.DefaultMarshaler{},
我尝试将Unmarshaler设置为类似avro.API{}的内容,但是我得到了编译器错误。
根据https://watermill.io/pubsubs/kafka/的说明:
kafka.SubscriberConfig{
Brokers: []string{"localhost:9092"},
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubscriberConfig,
ConsumerGroup: "test_consumer_group",
},
如果我使用golang发送数据并使用golang接收该数据,一切都正常工作。
英文:
I'm trying to debug why it is no possible to de-serialize data we receive through Kafka and serialized with avro based on some specifics schema.
It looks like this.
There is a producer written in java which serializes data according some avro schema, sends that data to Kafka topic and on the consumer side there is a program written in Golang.
But each time I try to de-serialize that data I get error message saying that is was not possible to de serialize because length of the string is less than zero. (dependencies field)
Schema looks like this:
{
"namespace": "data.avro",
"doc":"Docstring.",
"name": "Avrodata",
"type": "record",
"fields": [
{"name": "id", "type": "int"},
{"name": "theName", "type": "string"},
{"name": "dependencies", "type": {"type": "array", "items": "string"}}
]
}
My most recent suspicion is that maybe I need to change this Unmarshaler to avro but I have no idea how to do that.
I use this avro library https://github.com/hamba/avro and https://watermill.io/pubsubs/kafka/ for Kafka.
Unmarshaler: kafka.DefaultMarshaler{},
I've tried to set Unmarshaler to something like avro.API{} but I got compiler error.
Based on this https://watermill.io/pubsubs/kafka/
kafka.SubscriberConfig{
Brokers: []string{"localhost:9092"},
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubscriberConfig,
ConsumerGroup: "test_consumer_group",
},
If I send data with golang and receive that data with golang everything works perfect.
答案1
得分: 1
https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format
> 在大多数情况下,您可以直接使用序列化器和格式化器,而不必担心消息如何映射到字节的细节。但是,如果您使用的是Confluent尚未为其开发序列化器的语言,或者只是想更深入地了解Confluent平台的工作原理,这里有关于数据如何映射到底层字节的更多详细信息。
从第5个字节到有效载荷末尾的字节包含实际序列化数据。
> 指定模式格式的序列化数据(例如,Avro或Protocol Buffers的二进制编码)。唯一的例外是原始字节,它们将直接写入而不进行任何特殊编码。
因此,
avro.Unmarshal(schema, data[5:], out);
将解组使用Confluent开发的序列化器序列化的数据。
祝好。
英文:
https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format
> In most cases, you can use the serializers and formatter directly and not worry about the details of how messages are mapped to bytes. However, if you’re working with a language that Confluent has not developed serializers for, or simply want a deeper understanding of how the Confluent Platform works, here is more detail on how data is mapped to low-level bytes.
Bytes from 5th to the end of payload contains actual serialized data.
>Serialized data for the specified schema format (for example, binary encoding for Avro or Protocol Buffers). The only exception is raw bytes, which will be written directly without any special encoding.
So
avro.Unmarshal(schema, data[5:], out);
will unmarshal that data which was serialized with language that Confluent has developed serializers for.
Cheers.
专注分享java语言的经验与见解,让所有开发者获益!
评论