来自Kafka的消息未被发送到Clickhouse。

huangapple 未分类评论44阅读模式
英文:

Messages from Kafka aren't being sent to Clickhouse

问题

我的 Java 应用程序向 Apache Kafka 发送消息。

ContainerProperties containerProps = new ContainerProperties("topic1");
final CountDownLatch latch = new CountDownLatch(1);
containerProps.setMessageListener(new MessageListener<Integer, MyData>() {

    @Override
    public void onMessage(ConsumerRecord<Integer, MyData> message) {
        logger.info("received: " + message);
        latch.countDown();
    }

});
KafkaMessageListenerContainer<Integer, MyData> container = createContainer(containerProps);
container.setBeanName("testAuto");
container.start();
KafkaTemplate<Integer, MyData> template = createTemplate();
template.setDefaultTopic("topic1");
template.sendDefault(0, new MyData("foo"));
template.flush();
container.stop();

消费者属性:

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<my_host>:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.*");

发送者属性:

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<my_host>:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

日志中可以看到类似的记录:

received: ConsumerRecord(topic = topic1, partition = 0, leaderEpoch = 0, offset = 42, CreateTime = 1595598507889, serialized key size = 4, serialized value size = 36, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = MyData{name='foo'})

这意味着消息已经成功发送到 Kafka。同时,我在相同的主机上有 ClickHouse 中的表:

CREATE TABLE IF NOT EXISTS schema_name.table (...) ENGINE = Kafka('localhost:9092', 'topic1', 'group1', 'JSONEachRow');

CREATE TABLE IF NOT EXISTS schema_name.table_view (...) ENGINE = MergeTree() ORDER BY datetime;

CREATE MATERIALIZED VIEW schema_name.consumer TO schema_name.table_view AS SELECT * FROM schema_name.table;

ClickHouse 中的表是空的。

英文:

My java application sends messaged to Apache kafka

ContainerProperties containerProps = new ContainerProperties(&quot;topic1&quot;);
final CountDownLatch latch = new CountDownLatch(1);
containerProps.setMessageListener(new MessageListener&lt;Integer, MyData&gt;() {

    @Override
    public void onMessage(ConsumerRecord&lt;Integer, MyData&gt; message) {
        logger.info(&quot;received: &quot; + message);
        latch.countDown();
    }

});
KafkaMessageListenerContainer&lt;Integer, MyData&gt; container = createContainer(containerProps);
container.setBeanName(&quot;testAuto&quot;);
container.start();
KafkaTemplate&lt;Integer, MyData&gt; template = createTemplate();
template.setDefaultTopic(&quot;topic1&quot;);
template.sendDefault(0, new MyData(&quot;foo&quot;));
template.flush();
container.stop();

Consumer properties:

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, &quot;&lt;my_host&gt;:9092&quot;);
props.put(ConsumerConfig.GROUP_ID_CONFIG, &quot;group1&quot;);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, &quot;100&quot;);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, &quot;15000&quot;);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, &quot;com.*&quot;);

Sender properties:

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, &quot;&lt;my_host&gt;:9092&quot;);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

I can see in log record like:

received: ConsumerRecord(topic = topic1, partition = 0, leaderEpoch = 0, offset = 42, CreateTime = 1595598507889, serialized key size = 4, serialized value size = 36, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = MyData{name=&#39;foo&#39;})

It means that messages achieve come to Kafka. Also I have tables in Clickhouse on the same host:

CREATE TABLE IF NOT EXISTS schema_name.table (...) ENGINE = Kafka(&#39;localhost:9092&#39;, &#39;topic1&#39;, &#39;group1&#39;, &#39;JSONEachRow&#39;);

CREATE TABLE IF NOT EXISTS schema_name.table_view (...) ENGINE = MergeTree() ORDER BY datetime;

CREATE MATERIALIZED VIEW schema_name.consumer TO schema_name.table_view AS SELECT * FROM schema_name.table;

Tables in clickhouse are empty.

huangapple
  • 本文由 发表于 2020年7月24日 23:29:58
  • 转载请务必保留本文链接:https://java.coder-hub.com/63076726.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定