RequestReplyFuture<String, String, List<Product>> not mapped, Instead it mapped to ArrayList<LinkedHashMap>

huangapple 未分类评论45阅读模式
英文:

RequestReplyFuture<String, String, List<Product>> not mapped, Instead it mapped to ArrayList<LinkedHashMap>

问题

@Service
public class ProductProducer implements IProductProducer{
  private final ReplyingKafkaTemplate<String, String, List<Product>> _replyTemplate;
    private static final Logger LOG = LoggerFactory.getLogger(ProductProducer.class);
    public ProductProducer(ReplyingKafkaTemplate<String, String, List<Product>> replyTemplate) {
        this._replyTemplate = replyTemplate;
    }

 @Override
    public List<ProductViewModel> GetProducts() throws InterruptedException, ExecutionException, TimeoutException {
        RequestReplyFuture<String, String, List<Product>> future =
                this._replyTemplate.sendAndReceive(new ProducerRecord<>(ProductTopicConstants.GET_PRODUCTS, 0, null, null));
            LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
            List<Product> products = future.get(10, TimeUnit.SECONDS).value(); --> Property not mapped to Product
var productViewModels = products.stream().map(item -> new ProductViewModel(item.getId(),item.getName(),item.getPrice(), item.getDescription())).collect(Collectors.toList());
            return productViewModels;
    }
}

# Kafka Configuration

@Configuration
public class KafkaConfiguration {
    @Bean
    public ReplyingKafkaTemplate<String, String, List<Product>> replyer(ProducerFactory<String, String> pf,
                                                                        ConcurrentKafkaListenerContainerFactory<String, List<Product>> containerFactory) {

        containerFactory.setReplyTemplate(kafkaTemplate(pf));
        ConcurrentMessageListenerContainer<String, List<Product>> container = replyContainer(containerFactory);
        ReplyingKafkaTemplate<String, String, List<Product>> replyer = new ReplyingKafkaTemplate<>(pf, container);
        return replyer;
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, List<Product>> replyContainer(
            ConcurrentKafkaListenerContainerFactory<String, List<Product>> containerFactory) {

        ConcurrentMessageListenerContainer<String, List<Product>> container =
                containerFactory.createContainer(ProductTopicConstants.GET_PRODUCTS_CONTAINER);
        container.getContainerProperties().setGroupId(ProductTopicConstants.GET_PRODUCTS_CONTAINER);
        container.setBatchErrorHandler(new BatchLoggingErrorHandler());
        return container;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
        return new KafkaTemplate<>(pf);
    }



   @Bean
    public NewTopic GetProducts() {
        return TopicBuilder.name(ProductTopicConstants.GET_PRODUCTS).partitions(1).replicas(1).build();
    }
    @Bean
    public NewTopic GetProductsContainer() {
        return TopicBuilder.name(ProductTopicConstants.GET_PRODUCTS_CONTAINER).partitions(1).replicas(1).build();
    }
}
英文:

I am using RequestReplyFuture<String, String, List<Product>> to mapped the response to List<Product>, the result is something like below

RequestReplyFuture<String, String, List<Product>> not mapped, Instead it mapped to ArrayList<LinkedHashMap>

    @Service
    public class ProductProducer implements IProductProducer{
      private final ReplyingKafkaTemplate&lt;String, String, List&lt;Product&gt;&gt; _replyTemplate;
        private static final Logger LOG = LoggerFactory.getLogger(ProductProducer.class);
        public ProductProducer(ReplyingKafkaTemplate&lt;String, String, List&lt;Product&gt;&gt; replyTemplate) {
            this._replyTemplate = replyTemplate;
        }
    
     @Override
        public List&lt;ProductViewModel&gt; GetProducts() throws InterruptedException, ExecutionException, TimeoutException {
            RequestReplyFuture&lt;String, String, List&lt;Product&gt;&gt; future =
                    this._replyTemplate.sendAndReceive(new ProducerRecord&lt;&gt;(ProductTopicConstants.GET_PRODUCTS, 0, null, null));
                LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
                List&lt;Product&gt; products = future.get(10, TimeUnit.SECONDS).value(); --&gt; Property not mapped to Product
var productViewModels = products.stream().map(item -&gt; new ProductViewModel(item.getId(),item.getName(),item.getPrice(), item.getDescription())).collect(Collectors.toList());
                return productViewModels;
        }
    }

Kafka Configuration

@Configuration
public class KafkaConfiguration {
    @Bean
    public ReplyingKafkaTemplate&lt;String, String, List&lt;Product&gt;&gt; replyer(ProducerFactory&lt;String, String&gt; pf,
                                                                        ConcurrentKafkaListenerContainerFactory&lt;String, List&lt;Product&gt;&gt; containerFactory) {

        containerFactory.setReplyTemplate(kafkaTemplate(pf));
        ConcurrentMessageListenerContainer&lt;String, List&lt;Product&gt;&gt; container = replyContainer(containerFactory);
        ReplyingKafkaTemplate&lt;String, String, List&lt;Product&gt;&gt; replyer = new ReplyingKafkaTemplate&lt;&gt;(pf, container);
        return replyer;
    }

    @Bean
    public ConcurrentMessageListenerContainer&lt;String, List&lt;Product&gt;&gt; replyContainer(
            ConcurrentKafkaListenerContainerFactory&lt;String, List&lt;Product&gt;&gt; containerFactory) {

        ConcurrentMessageListenerContainer&lt;String, List&lt;Product&gt;&gt; container =
                containerFactory.createContainer(ProductTopicConstants.GET_PRODUCTS_CONTAINER);
        container.getContainerProperties().setGroupId(ProductTopicConstants.GET_PRODUCTS_CONTAINER);
        container.setBatchErrorHandler(new BatchLoggingErrorHandler());
        return container;
    }

    @Bean
    public KafkaTemplate&lt;String, String&gt; kafkaTemplate(ProducerFactory&lt;String, String&gt; pf) {
        return new KafkaTemplate&lt;&gt;(pf);
    }



   @Bean
    public NewTopic GetProducts() {
        return TopicBuilder.name(ProductTopicConstants.GET_PRODUCTS).partitions(1).replicas(1).build();
    }
    @Bean
    public NewTopic GetProductsContainer() {
        return TopicBuilder.name(ProductTopicConstants.GET_PRODUCTS_CONTAINER).partitions(1).replicas(1).build();
    }
}

答案1

得分: 0

由于类型擦除,在头文件中的类型信息让 Jackson 误以为它是 List<Object>

相反地,您可以使用一个类型函数,为 Jackson 提供更多信息...

spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.SomeClass.returnType
public static JavaType returnType(byte[] data, Headers headers) {
	return TypeFactory.defaultInstance()
			.constructCollectionLikeType(List.class, Product.class);
}
英文:

Due to type erasure, the type information in the headers makes Jackson think it's List&lt;Object&gt;.

You can use a type function instead, to give Jackson some more information...

spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.SomeClass.returnType
public static JavaType returnType(byte[] data, Headers headers) {
	return TypeFactory.defaultInstance()
			.constructCollectionLikeType(List.class, Product.class);
}

huangapple
  • 本文由 发表于 2020年7月26日 17:14:07
  • 转载请务必保留本文链接:https://java.coder-hub.com/63098250.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定