英文:
Improve performance for Kafka listeners in spring boot application deployed in PCF
问题
我正在处理一个使用案例,需要处理几乎每秒 600 条消息(从主题订阅,进行转换,保存到 SQL Server 表,然后再发布到主题),但我们每个实例只处理每秒 100 条消息,每个实例共有 5 个。我们不能增加更多实例来实现这一点。是否有任何建议?
技术和基础设施:
使用 Spring Boot 应用程序和 Kafka 监听器(无批处理监听器),部署在 PCF 上。源和输出主题各自有 10 个分区。使用默认属性和设置。转换只需要几毫秒的时间。
英文:
I am working on use case that need to process almost 600 messages/sec (subscribe from topic, Transform, save to SQL Server table and produce back to topic) but we are only processing 100 messages/sec per 5 instance. we cannot increase more instances to achieve this. Any suggestions will be helpful ?
Tech and Infrastructure:
spring boot application with Kafka listeners( no batch listener) deployed in PCF. source and out topic each with 10 partitions each. Default properties and settings are using. Transformation taking fraction of milli seconds.
答案1
得分: 0
我有一个类似的用例,我通过在每个监听器上添加并发性(10)并增加队列中的分区来提高性能,配置如下:
@Bean
public ThreadPoolTaskExecutor messageProcessorExecutor() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(poolSize);
exec.setMaxPoolSize(poolMaxSize);
exec.setKeepAliveSeconds(keepAlive);
return exec;
}
@Bean
public ConsumerFactory<String, Request> consumerFactory() {
DefaultKafkaConsumerFactory<String, Request> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfigs());
consumerFactory.setKeyDeserializer(new StringDeserializer());
consumerFactory.setValueDeserializer(new JsonDeserializer<>(Request.class));
return consumerFactory;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Request>> kafkaListenerContainerFactory(
ThreadPoolTaskExecutor messageProcessorExecutor,
ConsumerFactory<String, Request> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, Request> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(10);
factory.getContainerProperties().setPollTimeout(pollTimeout);
factory.getContainerProperties().setConsumerTaskExecutor(messageProcessorExecutor);
return factory;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
英文:
I had a similar use case, I improve performance adding concurrency (10) to each listener and increasing partitions in the queue with the following configuration
@Bean
public ThreadPoolTaskExecutor messageProcessorExecutor() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(poolSize);
exec.setMaxPoolSize(poolMaxSize);
exec.setKeepAliveSeconds(keepAlive);
return exec;
}
@Bean
public ConsumerFactory<String, Request> consumerFactory() {
DefaultKafkaConsumerFactory<String, Request> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfigs());
consumerFactory.setKeyDeserializer(new StringDeserializer());
consumerFactory.setValueDeserializer(new JsonDeserializer<>(Request.class));
return consumerFactory;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Request>> kafkaListenerContainerFactory(
ThreadPoolTaskExecutor messageProcessorExecutor,
ConsumerFactory<String, Request> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, Request> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(10);
factory.getContainerProperties().setPollTimeout(pollTimeout);
factory.getContainerProperties().setConsumerTaskExecutor(messageProcessorExecutor);
return factory;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
专注分享java语言的经验与见解,让所有开发者获益!
评论