英文:
Messages from Kafka aren't being sent to Clickhouse
问题
我的 Java 应用程序向 Apache Kafka 发送消息。
ContainerProperties containerProps = new ContainerProperties("topic1");
final CountDownLatch latch = new CountDownLatch(1);
containerProps.setMessageListener(new MessageListener<Integer, MyData>() {
@Override
public void onMessage(ConsumerRecord<Integer, MyData> message) {
logger.info("received: " + message);
latch.countDown();
}
});
KafkaMessageListenerContainer<Integer, MyData> container = createContainer(containerProps);
container.setBeanName("testAuto");
container.start();
KafkaTemplate<Integer, MyData> template = createTemplate();
template.setDefaultTopic("topic1");
template.sendDefault(0, new MyData("foo"));
template.flush();
container.stop();
消费者属性:
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<my_host>:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.*");
发送者属性:
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<my_host>:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
日志中可以看到类似的记录:
received: ConsumerRecord(topic = topic1, partition = 0, leaderEpoch = 0, offset = 42, CreateTime = 1595598507889, serialized key size = 4, serialized value size = 36, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = MyData{name='foo'})
这意味着消息已经成功发送到 Kafka。同时,我在相同的主机上有 ClickHouse 中的表:
CREATE TABLE IF NOT EXISTS schema_name.table (...) ENGINE = Kafka('localhost:9092', 'topic1', 'group1', 'JSONEachRow');
CREATE TABLE IF NOT EXISTS schema_name.table_view (...) ENGINE = MergeTree() ORDER BY datetime;
CREATE MATERIALIZED VIEW schema_name.consumer TO schema_name.table_view AS SELECT * FROM schema_name.table;
ClickHouse 中的表是空的。
英文:
My java application sends messaged to Apache kafka
ContainerProperties containerProps = new ContainerProperties("topic1");
final CountDownLatch latch = new CountDownLatch(1);
containerProps.setMessageListener(new MessageListener<Integer, MyData>() {
@Override
public void onMessage(ConsumerRecord<Integer, MyData> message) {
logger.info("received: " + message);
latch.countDown();
}
});
KafkaMessageListenerContainer<Integer, MyData> container = createContainer(containerProps);
container.setBeanName("testAuto");
container.start();
KafkaTemplate<Integer, MyData> template = createTemplate();
template.setDefaultTopic("topic1");
template.sendDefault(0, new MyData("foo"));
template.flush();
container.stop();
Consumer properties:
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<my_host>:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.*");
Sender properties:
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<my_host>:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
I can see in log record like:
received: ConsumerRecord(topic = topic1, partition = 0, leaderEpoch = 0, offset = 42, CreateTime = 1595598507889, serialized key size = 4, serialized value size = 36, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = MyData{name='foo'})
It means that messages achieve come to Kafka. Also I have tables in Clickhouse on the same host:
CREATE TABLE IF NOT EXISTS schema_name.table (...) ENGINE = Kafka('localhost:9092', 'topic1', 'group1', 'JSONEachRow');
CREATE TABLE IF NOT EXISTS schema_name.table_view (...) ENGINE = MergeTree() ORDER BY datetime;
CREATE MATERIALIZED VIEW schema_name.consumer TO schema_name.table_view AS SELECT * FROM schema_name.table;
Tables in clickhouse are empty.
专注分享java语言的经验与见解,让所有开发者获益!
评论