自定义使用Spring Kafka转换Apache Kafka头部信息

huangapple 未分类评论45阅读模式
英文:

Custom conversion of Apache Kafka header using Spring Kafka

问题

我正在使用 Spring Boot 2.3.0 和 Spring Kafka 2.5.0在我的 KafkaListener 中我尝试将 MessageHeaders 映射到自定义类的映射下面的代码有效但是它将头部以 byte[] 形式给出然后我必须在监听器内部将其转换为类并且对每个监听器都要重复此操作),我希望避免这种情况

```java
    @Slf4j
    @Component
    @KafkaListener(topics = {"${spring.kafka.topics.simple}"}, groupId = "consumerGroup", 
    containerFactory = "kafkaListenerContainerFactory")
    public class RequestConsumer {

       @KafkaHandler
       public void listen(@Payload CustomerDetails customerDetails, @Header("sec") byte[] 
       principle, @Headers MessageHeaders messageHeaders) {
           log.info("Received a CustomerDetails");
       }

       @KafkaHandler(isDefault = true)
       public void listen(@Payload(required = false) GenericRecord object, @Headers 
       MessageHeaders messageHeaders) {
           log.info("Received an unexpected object");
       }
    }

当我将代码更改为以下内容时:

    @KafkaHandler
    public void listen(@Payload CustomerDetails customerDetails, @Header("sec") PreAuthenticatedAuthenticationToken principle, @Headers MessageHeaders messageHeaders) {
        log.info("Received a CustomerDetails");
    }

监听器将会因为以下错误而中断:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.example.demo.consumer.RequestConsumer.listen(com.example.schemas.CustomerDetails,org.springframework.security.web.authentication.preauth.PreAuthenticatedAuthenticationToken,org.springframework.messaging.MessageHeaders)' threw exception; nested exception is org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [byte[]] to type [@org.springframework.messaging.handler.annotation.Header org.springframework.security.web.authentication.preauth.PreAuthenticatedAuthenticationToken]; nested exception is org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [byte[]] to type [@org.springframework.messaging.handler.annotation.Header org.springframework.security.web.authentication.preauth.PreAuthenticatedAuthenticationToken]

我已经查阅了 https://docs.spring.io/spring-kafka/reference/html/#headers 文档,该文档讨论了从 Headers 到 MessageHeaders 的头映射以及反之,但没有提供转换这些头的示例。错误似乎表明我可以以某种方式注册转换器,但尽管进行了许多谷歌搜索并尝试了 Spring Kafka 代码的调试,我仍然没有找到具体方法。

非常感谢您的帮助。

致敬,
Oskar


<details>
<summary>英文:</summary>

I am using Spring Boot 2.3.0 with Spring Kafka 2.5.0 in in my KafkaListener I am trying to map  to map of the MessageHeaders to a custom class. The code below works, but gives me the header in byte[] that I would then have to convert to the class inside the listener (and repeat that for every listener), which I would like to avoid.

@Slf4j
@Component
@KafkaListener(topics = {&quot;${spring.kafka.topics.simple}&quot;}, groupId = &quot;consumerGroup&quot;, 
containerFactory = &quot;kafkaListenerContainerFactory&quot;)
public class RequestConsumer {

   @KafkaHandler
   public void listen(@Payload CustomerDetails customerDetails, @Header(&quot;sec&quot;) byte[] 
   principle, @Headers MessageHeaders messageHeaders) {
       log.info(&quot;Received a CustomerDetails&quot;);
   }

   @KafkaHandler(isDefault = true)
   public void listen(@Payload(required = false) GenericRecord object, @Headers 
   MessageHeaders messageHeaders) {
       log.info(&quot;Received an unexpected object&quot;);
   }
}

THe moment i change the code to this:
@KafkaHandler
public void listen(@Payload CustomerDetails customerDetails, @Header(&quot;sec&quot;) PreAuthenticatedAuthenticationToken principle, @Headers MessageHeaders messageHeaders) {
    log.info(&quot;Received a CustomerDetails&quot;);
}
The listener will break with this error:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method &#39;public void com.example.demo.consumer.RequestConsumer.listen(com.example.schemas.CustomerDetails,org.springframework.security.web.authentication.preauth.PreAuthenticatedAuthenticationToken,org.springframework.messaging.MessageHeaders)&#39; threw exception; nested exception is org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [byte[]] to type [@org.springframework.messaging.handler.annotation.Header org.springframework.security.web.authentication.preauth.PreAuthenticatedAuthenticationToken]; nested exception is org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [byte[]] to type [@org.springframework.messaging.handler.annotation.Header org.springframework.security.web.authentication.preauth.PreAuthenticatedAuthenticationToken]

I&#39;ve looked over the https://docs.spring.io/spring-kafka/reference/html/#headers documentation which talks about header mapping from Headers to MessageHeaders and vise versa, but has no examples of converting those headers. The error seems to suggest that I can register a convertor somehow, but have not found out how despite many Google searches and trying to step through the code of Spring Kafka.

Help on this would be greatly appreciated.

Cheers,
Oskar

</details>


# 答案1
**得分**: 0

我坚持下来,找到了一种适当的方法来实现这一点。虽然我没有找到转换标头的方法,但可以编写一个handlerMethodArgumentResolver,允许我们在监听器中使用自定义对象。在这段代码内部,很容易将标头转换为PreAuthenticatedAuthenticationToken对象,然后可以直接在监听器中使用。

更多详细信息:https://docs.spring.io/spring-kafka/docs/2.5.3.RELEASE/reference/html/#adding-custom-handlermethodargumentresolver-to-kafkalistener

经验证有效。

干杯。

<details>
<summary>英文:</summary>

I kept at it, and I found a proper way to implement this. While I did not find a way to convert the header, it&#39;s possible write a handlerMethodArgumentResolver that allows us to use custom objects in listeners. Inside this code it&#39;s easy to convert the header to a PreAuthenticatedAuthenticationToken object which I can then use in the listener directly.


More details: https://docs.spring.io/spring-kafka/docs/2.5.3.RELEASE/reference/html/#adding-custom-handlermethodargumentresolver-to-kafkalistener

Validated to be working.

Cheers.

</details>



huangapple
  • 本文由 发表于 2020年7月26日 22:23:15
  • 转载请务必保留本文链接:https://java.coder-hub.com/63101420.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定