英文:
java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord
问题
以下是您要翻译的内容:
我正在尝试使用 KafkaConsumer 来消费记录,但我得到了以下异常。
public void kafkaConsumerRun() {
Properties prop = getProperties();
try (KafkaConsumer<String, GenericRecord> kafkaConsumer = new KafkaConsumer<>(prop)) {
kafkaConsumer.subscribe(Arrays.asList(topicName));
Schema schema = Common.SCHEMA;
log.info("SCHEMA = " + schema.toString());
Map<String, Object> eventMap = new LinkedHashMap<>();
while (true) {
ConsumerRecords<String, GenericRecord> records = kafkaConsumer.poll(Duration.ofSeconds(pollInterval));
JSONObject payloadJson = new JSONObject();
if (records.count() > 0) {
log.info(records.count() + " records in partition");
for (ConsumerRecord<String, GenericRecord> rec : records) {
GenericRecord record = SpecificData.get().deepCopy(schema, rec.value());
log.info("Priting Events before camel case conversion : " + record.toString());
for (Iterator<Schema.Field> fieldItr = record.getSchema().getFields().iterator(); fieldItr
.hasNext();) {
String fieldName = fieldItr.next().name();
Object value = record.get(fieldName);
getFieldName(fieldName, eventMap, value);
}
log.info("JSON String: " + new JSONObject(eventMap).toString());
callOrchestrator(eventMap, payloadJson);
}
}
}
} catch (Exception ex) {
log.error("Exception Caught: ", ex);
}
}
英文:
i am try to use KafkaConsumer to consume the records, but i got the below exception
public void kafkaConsumerRun() {
Properties prop = getProperties();
try (KafkaConsumer<String, GenericRecord> kafkaConsumer = new KafkaConsumer<>(prop)) {
kafkaConsumer.subscribe(Arrays.asList(topicName));
Schema schema = Common.SCHEMA;
log.info("SCHEMA = " + schema.toString());
Map<String, Object> eventMap = new LinkedHashMap<>();
while (true) {
ConsumerRecords<String, GenericRecord> records = kafkaConsumer.poll(Duration.ofSeconds(pollInterval));
JSONObject payloadJson = new JSONObject();
if (records.count() > 0) {
log.info(records.count() + " records in partition");
for (ConsumerRecord<String, GenericRecord> rec : records) {
GenericRecord record = SpecificData.get().deepCopy(schema, rec.value());
log.info("Priting Events before camel case conversion : " + record.toString());
for (Iterator<Schema.Field> fieldItr = record.getSchema().getFields().iterator(); fieldItr
.hasNext();) {
String fieldName = fieldItr.next().name();
Object value = record.get(fieldName);
getFieldName(fieldName, eventMap, value);
}
log.info("JSON String: " + new JSONObject(eventMap).toString());
callOrchestrator(eventMap, payloadJson);
}
}
}
} catch (Exception ex) {
log.error("Exception Caught: ", ex);
}
答案1
得分: 0
异常堆栈跟踪应该能够准确定位出错的那一行代码,它正告诉你发生了什么:你正在尝试将一个String
实例转换为GenericRecord
。确保当你将一个对象传递为Object
时,在需要的时候将其强制转换回正确的类型。
英文:
The Exception stack trace should point you at exactly which line is wrong, and it is telling you exactly what is happening: there is an instance of String
that you are trying to cast to GenericRecord
. Make sure that when you are passing an object as an Object
that you cast it back to the proper type when you need to.
专注分享java语言的经验与见解,让所有开发者获益!
评论