英文:
Custom conversion of Apache Kafka header using Spring Kafka
问题
我正在使用 Spring Boot 2.3.0 和 Spring Kafka 2.5.0,在我的 KafkaListener 中,我尝试将 MessageHeaders 映射到自定义类的映射。下面的代码有效,但是它将头部以 byte[] 形式给出,然后我必须在监听器内部将其转换为类(并且对每个监听器都要重复此操作),我希望避免这种情况。
```java
@Slf4j
@Component
@KafkaListener(topics = {"${spring.kafka.topics.simple}"}, groupId = "consumerGroup",
containerFactory = "kafkaListenerContainerFactory")
public class RequestConsumer {
@KafkaHandler
public void listen(@Payload CustomerDetails customerDetails, @Header("sec") byte[]
principle, @Headers MessageHeaders messageHeaders) {
log.info("Received a CustomerDetails");
}
@KafkaHandler(isDefault = true)
public void listen(@Payload(required = false) GenericRecord object, @Headers
MessageHeaders messageHeaders) {
log.info("Received an unexpected object");
}
}
当我将代码更改为以下内容时:
@KafkaHandler
public void listen(@Payload CustomerDetails customerDetails, @Header("sec") PreAuthenticatedAuthenticationToken principle, @Headers MessageHeaders messageHeaders) {
log.info("Received a CustomerDetails");
}
监听器将会因为以下错误而中断:
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.example.demo.consumer.RequestConsumer.listen(com.example.schemas.CustomerDetails,org.springframework.security.web.authentication.preauth.PreAuthenticatedAuthenticationToken,org.springframework.messaging.MessageHeaders)' threw exception; nested exception is org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [byte[]] to type [@org.springframework.messaging.handler.annotation.Header org.springframework.security.web.authentication.preauth.PreAuthenticatedAuthenticationToken]; nested exception is org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [byte[]] to type [@org.springframework.messaging.handler.annotation.Header org.springframework.security.web.authentication.preauth.PreAuthenticatedAuthenticationToken]
我已经查阅了 https://docs.spring.io/spring-kafka/reference/html/#headers 文档,该文档讨论了从 Headers 到 MessageHeaders 的头映射以及反之,但没有提供转换这些头的示例。错误似乎表明我可以以某种方式注册转换器,但尽管进行了许多谷歌搜索并尝试了 Spring Kafka 代码的调试,我仍然没有找到具体方法。
非常感谢您的帮助。
致敬,
Oskar
<details>
<summary>英文:</summary>
I am using Spring Boot 2.3.0 with Spring Kafka 2.5.0 in in my KafkaListener I am trying to map to map of the MessageHeaders to a custom class. The code below works, but gives me the header in byte[] that I would then have to convert to the class inside the listener (and repeat that for every listener), which I would like to avoid.
@Slf4j
@Component
@KafkaListener(topics = {"${spring.kafka.topics.simple}"}, groupId = "consumerGroup",
containerFactory = "kafkaListenerContainerFactory")
public class RequestConsumer {
@KafkaHandler
public void listen(@Payload CustomerDetails customerDetails, @Header("sec") byte[]
principle, @Headers MessageHeaders messageHeaders) {
log.info("Received a CustomerDetails");
}
@KafkaHandler(isDefault = true)
public void listen(@Payload(required = false) GenericRecord object, @Headers
MessageHeaders messageHeaders) {
log.info("Received an unexpected object");
}
}
THe moment i change the code to this:
@KafkaHandler
public void listen(@Payload CustomerDetails customerDetails, @Header("sec") PreAuthenticatedAuthenticationToken principle, @Headers MessageHeaders messageHeaders) {
log.info("Received a CustomerDetails");
}
The listener will break with this error:
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.example.demo.consumer.RequestConsumer.listen(com.example.schemas.CustomerDetails,org.springframework.security.web.authentication.preauth.PreAuthenticatedAuthenticationToken,org.springframework.messaging.MessageHeaders)' threw exception; nested exception is org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [byte[]] to type [@org.springframework.messaging.handler.annotation.Header org.springframework.security.web.authentication.preauth.PreAuthenticatedAuthenticationToken]; nested exception is org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [byte[]] to type [@org.springframework.messaging.handler.annotation.Header org.springframework.security.web.authentication.preauth.PreAuthenticatedAuthenticationToken]
I've looked over the https://docs.spring.io/spring-kafka/reference/html/#headers documentation which talks about header mapping from Headers to MessageHeaders and vise versa, but has no examples of converting those headers. The error seems to suggest that I can register a convertor somehow, but have not found out how despite many Google searches and trying to step through the code of Spring Kafka.
Help on this would be greatly appreciated.
Cheers,
Oskar
</details>
# 答案1
**得分**: 0
我坚持下来,找到了一种适当的方法来实现这一点。虽然我没有找到转换标头的方法,但可以编写一个handlerMethodArgumentResolver,允许我们在监听器中使用自定义对象。在这段代码内部,很容易将标头转换为PreAuthenticatedAuthenticationToken对象,然后可以直接在监听器中使用。
更多详细信息:https://docs.spring.io/spring-kafka/docs/2.5.3.RELEASE/reference/html/#adding-custom-handlermethodargumentresolver-to-kafkalistener
经验证有效。
干杯。
<details>
<summary>英文:</summary>
I kept at it, and I found a proper way to implement this. While I did not find a way to convert the header, it's possible write a handlerMethodArgumentResolver that allows us to use custom objects in listeners. Inside this code it's easy to convert the header to a PreAuthenticatedAuthenticationToken object which I can then use in the listener directly.
More details: https://docs.spring.io/spring-kafka/docs/2.5.3.RELEASE/reference/html/#adding-custom-handlermethodargumentresolver-to-kafkalistener
Validated to be working.
Cheers.
</details>
专注分享java语言的经验与见解,让所有开发者获益!
评论