优化在部署在PCF上的Spring Boot应用中的Kafka监听器性能。

huangapple 未分类评论44阅读模式
英文:

Improve performance for Kafka listeners in spring boot application deployed in PCF

问题

我正在处理一个使用案例,需要处理几乎每秒 600 条消息(从主题订阅,进行转换,保存到 SQL Server 表,然后再发布到主题),但我们每个实例只处理每秒 100 条消息,每个实例共有 5 个。我们不能增加更多实例来实现这一点。是否有任何建议?

技术和基础设施
使用 Spring Boot 应用程序和 Kafka 监听器(无批处理监听器),部署在 PCF 上。源和输出主题各自有 10 个分区。使用默认属性和设置。转换只需要几毫秒的时间。

英文:

I am working on use case that need to process almost 600 messages/sec (subscribe from topic, Transform, save to SQL Server table and produce back to topic) but we are only processing 100 messages/sec per 5 instance. we cannot increase more instances to achieve this. Any suggestions will be helpful ?

Tech and Infrastructure:
spring boot application with Kafka listeners( no batch listener) deployed in PCF. source and out topic each with 10 partitions each. Default properties and settings are using. Transformation taking fraction of milli seconds.

答案1

得分: 0

我有一个类似的用例我通过在每个监听器上添加并发性10并增加队列中的分区来提高性能配置如下

@Bean
public ThreadPoolTaskExecutor messageProcessorExecutor() {
    ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
    exec.setCorePoolSize(poolSize);
    exec.setMaxPoolSize(poolMaxSize);
    exec.setKeepAliveSeconds(keepAlive);
    return exec;
}

@Bean
public ConsumerFactory<String, Request> consumerFactory() {
    DefaultKafkaConsumerFactory<String, Request> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfigs());
    consumerFactory.setKeyDeserializer(new StringDeserializer());
    consumerFactory.setValueDeserializer(new JsonDeserializer<>(Request.class));
    return consumerFactory;
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Request>> kafkaListenerContainerFactory(
        ThreadPoolTaskExecutor messageProcessorExecutor,
        ConsumerFactory<String, Request> consumerFactory) {

    ConcurrentKafkaListenerContainerFactory<String, Request> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setConcurrency(10);
    factory.getContainerProperties().setPollTimeout(pollTimeout);
    factory.getContainerProperties().setConsumerTaskExecutor(messageProcessorExecutor);
    return factory;
}

private Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return props;
}
英文:

I had a similar use case, I improve performance adding concurrency (10) to each listener and increasing partitions in the queue with the following configuration

@Bean
public ThreadPoolTaskExecutor messageProcessorExecutor() {
    ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
    exec.setCorePoolSize(poolSize);
    exec.setMaxPoolSize(poolMaxSize);
    exec.setKeepAliveSeconds(keepAlive);
    return exec;
}

@Bean
public ConsumerFactory&lt;String, Request&gt; consumerFactory() {
    DefaultKafkaConsumerFactory&lt;String, Request&gt; consumerFactory = new DefaultKafkaConsumerFactory&lt;&gt;(consumerConfigs());
    consumerFactory.setKeyDeserializer(new StringDeserializer());
    consumerFactory.setValueDeserializer(new JsonDeserializer&lt;&gt;(Request.class));
    return consumerFactory;
}

@Bean
public KafkaListenerContainerFactory&lt;ConcurrentMessageListenerContainer&lt;String, Request&gt;&gt; kafkaListenerContainerFactory(
        ThreadPoolTaskExecutor messageProcessorExecutor,
        ConsumerFactory&lt;String, Request&gt; consumerFactory) {

    ConcurrentKafkaListenerContainerFactory&lt;String, Request&gt; factory = new ConcurrentKafkaListenerContainerFactory&lt;&gt;();
    factory.setConsumerFactory(consumerFactory);
    factory.setConcurrency(10);
    factory.getContainerProperties().setPollTimeout(pollTimeout);
    factory.getContainerProperties().setConsumerTaskExecutor(messageProcessorExecutor);
    return factory;
}

private Map&lt;String, Object&gt; consumerConfigs() {
    Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return props;
}

huangapple
  • 本文由 发表于 2020年7月24日 17:29:56
  • 转载请务必保留本文链接:https://java.coder-hub.com/63070785.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定