使用消费者进行Spring Kafka轮询

huangapple 未分类评论51阅读模式
标题翻译

Spring Kafka polling using Consumer

问题

以下是翻译好的内容:

我正在使用 consumer.poll(Duration d) 方法来获取记录。我在 Kafka 主题中仅有 10 条记录,用于测试目的,分布在 6 个分区中。我已经禁用了自动提交,也没有手动提交(同样是为了测试目的)。当执行 poll 方法时,并未从所有分区中获取数据。我需要在循环中运行 poll 方法以获取所有数据。我并未覆盖像 max.poll.size 或 max.fetch.bytes 这样的参数,它们仍然使用默认值。可能的原因是什么?请注意,对于给定的主题和组 ID,我仅有这一个消费者,因此我希望所有分区都将分配给它。

以下是部分代码:

private Consumer<String, Object> createConsumer() {
    ConsumerFactory<String, Object> consumerFactory = deadLetterConsumerFactory();
    Consumer<String, Object> consumer = consumerFactory.createConsumer();
    consumer.subscribe(Collections.singletonList(kafkaConfigProperties.getDeadLetterTopic()));
    return consumer;
}

try {
    consumer = createConsumer();
    ConsumerRecords<String, Object> records = consumer.poll(Duration.ofMillis(5000));
    processMessages(records, ..., ....);
} catch (Exception e) {
    // 异常处理...
} finally {
    if (consumer != null) {
        consumer.unsubscribe();
        consumer.close();
    }
}

此外,以下是 deadLetterConsumerFactory() 方法的一些细节:

ConsumerFactory<String, Object> deadLetterConsumerFactory() {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
    // 其他属性设置...
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "groupid");
    consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    // 其他属性设置...
    return new DefaultKafkaConsumerFactory<>(map);
}
英文翻译

I am using consumer.poll(Duration d) for fetching the records. I have only 10 records for testing purpose in Kafka topic spread across 6 partitions. I have disabled auto commit and not committing manually either (again for testing purpose only). When poll is executed it is not fetching data from all partitions. I need to run the poll in a loop to get all the data. I haven't overridden the parameters like max.poll.size or max.fetch.bytes from its default values. What could be the reason? Please note that I have only this consumer for the given topic and group id hence I hope all the partitions will be assigned to this

private Consumer&lt;String, Object&gt; createConsumer() {
    ConsumerFactory&lt;String, Object&gt; consumerFactory = deadLetterConsumerFactory();
    Consumer&lt;String, Object&gt; consumer = consumerFactory.createConsumer();
    consumer.subscribe(Collections.singletonList(kafkaConfigProperties.getDeadLetterTopic()));
    return consumer;
}
 
try {
       consumer = createConsumer();
       ConsumerRecords&lt;String, Object&gt; records = consumer.poll(Duration.ofMillis(5000));
       processMessages (records , .,....);
} catch (Exception e) {
       ....
} finally {
     if (consumer != null) {
          consumer.unsubscribe();
          consumer.close();
      }
}

EDIT
Here is the details

ConsumerFactory&lt;String, Object&gt; deadLetterConsumerFactory() {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
    properties.put(SCHEMA_REGISTRY_URL, url);
    properties.put(ProducerConfig.CLIENT_ID_CONFIG,
           &quot;myid&quot; + &quot;-&quot; + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement());
    properties.put(SSL_ENDPOINT_IDFN_ALGM,  alg);
    properties.put(SaslConfigs.SASL_MECHANISM, saslmech);
    properties.put(REQUEST_TIMEOUT,  timeout);       
    properties.put(SaslConfigs.SASL_JAAS_CONFIG, config);
    properties.put(SECURITY_PROTOCOL,  protocol);
     properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, &quot;earliest&quot;);
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, &quot;false&quot;);
    consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, &quot;groupid&quot;);
    consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    consumerProperties.forEach((key, value) -&gt; {
        map.put((String) key, value);
    });
    return new DefaultKafkaConsumerFactory&lt;&gt;(map);
}

huangapple
  • 本文由 发表于 2020年3月17日 01:30:47
  • 转载请务必保留本文链接:https://java.coder-hub.com/60710572.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定