标题翻译
Spring Kafka polling using Consumer
问题
以下是翻译好的内容:
我正在使用 consumer.poll(Duration d) 方法来获取记录。我在 Kafka 主题中仅有 10 条记录,用于测试目的,分布在 6 个分区中。我已经禁用了自动提交,也没有手动提交(同样是为了测试目的)。当执行 poll 方法时,并未从所有分区中获取数据。我需要在循环中运行 poll 方法以获取所有数据。我并未覆盖像 max.poll.size 或 max.fetch.bytes 这样的参数,它们仍然使用默认值。可能的原因是什么?请注意,对于给定的主题和组 ID,我仅有这一个消费者,因此我希望所有分区都将分配给它。
以下是部分代码:
private Consumer<String, Object> createConsumer() {
ConsumerFactory<String, Object> consumerFactory = deadLetterConsumerFactory();
Consumer<String, Object> consumer = consumerFactory.createConsumer();
consumer.subscribe(Collections.singletonList(kafkaConfigProperties.getDeadLetterTopic()));
return consumer;
}
try {
consumer = createConsumer();
ConsumerRecords<String, Object> records = consumer.poll(Duration.ofMillis(5000));
processMessages(records, ..., ....);
} catch (Exception e) {
// 异常处理...
} finally {
if (consumer != null) {
consumer.unsubscribe();
consumer.close();
}
}
此外,以下是 deadLetterConsumerFactory()
方法的一些细节:
ConsumerFactory<String, Object> deadLetterConsumerFactory() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
// 其他属性设置...
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "groupid");
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 其他属性设置...
return new DefaultKafkaConsumerFactory<>(map);
}
英文翻译
I am using consumer.poll(Duration d) for fetching the records. I have only 10 records for testing purpose in Kafka topic spread across 6 partitions. I have disabled auto commit and not committing manually either (again for testing purpose only). When poll is executed it is not fetching data from all partitions. I need to run the poll in a loop to get all the data. I haven't overridden the parameters like max.poll.size or max.fetch.bytes from its default values. What could be the reason? Please note that I have only this consumer for the given topic and group id hence I hope all the partitions will be assigned to this
private Consumer<String, Object> createConsumer() {
ConsumerFactory<String, Object> consumerFactory = deadLetterConsumerFactory();
Consumer<String, Object> consumer = consumerFactory.createConsumer();
consumer.subscribe(Collections.singletonList(kafkaConfigProperties.getDeadLetterTopic()));
return consumer;
}
try {
consumer = createConsumer();
ConsumerRecords<String, Object> records = consumer.poll(Duration.ofMillis(5000));
processMessages (records , .,....);
} catch (Exception e) {
....
} finally {
if (consumer != null) {
consumer.unsubscribe();
consumer.close();
}
}
EDIT
Here is the details
ConsumerFactory<String, Object> deadLetterConsumerFactory() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
properties.put(SCHEMA_REGISTRY_URL, url);
properties.put(ProducerConfig.CLIENT_ID_CONFIG,
"myid" + "-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement());
properties.put(SSL_ENDPOINT_IDFN_ALGM, alg);
properties.put(SaslConfigs.SASL_MECHANISM, saslmech);
properties.put(REQUEST_TIMEOUT, timeout);
properties.put(SaslConfigs.SASL_JAAS_CONFIG, config);
properties.put(SECURITY_PROTOCOL, protocol);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "groupid");
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.forEach((key, value) -> {
map.put((String) key, value);
});
return new DefaultKafkaConsumerFactory<>(map);
}
专注分享java语言的经验与见解,让所有开发者获益!
评论