英文:
I have a problem when working with readStream().format("kafka")
问题
请帮忙修复错误:
错误信息:
20/04/09 18:38:44 ERROR MicroBatchExecution: 查询 [id = 9f3cbbf6-85a8-4aed-89c6-f5d3ff9c40fa, runId = 73c071c6-e222-4760-a750-393666a298af] 因错误而终止
java.lang.ClassCastException: org.apache.spark.sql.execution.streaming.SerializedOffset 无法转换为 org.apache.spark.sql.sources.v2.reader.streaming.Offset
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:405)
...
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
...
20/04/09 18:38:44 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint 已停止!
代码部分:
SQLContext sqlContext = new SQLContext(HadoopWork.sc);
sqlContext
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", IKafkaConstants.KAFKA_BROKERS)
.option("subscribe", IKafkaConstants.TOPIC_NAME)
.option("startingOffsets", "earliest")
.option("enable.auto.commit", false)
.option("checkpointLocation", "/tmp/checkpoint/1")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("parquet")
.outputMode("append")
.option("checkpointLocation", "/tmp/checkpoint/2")
.option("compression", "gzip")
.option("path", "/user/test/")
.option("enable.auto.commit", false)
.option("startingOffsets", "earliest")
.start();
请注意,上述内容是您提供的内容的中文翻译,不包括代码部分以外的任何其他回答。
英文:
Help please fix the error:
20/04/09 18:38:44 ERROR MicroBatchExecution: Query [id = 9f3cbbf6-85a8-4aed-89c6-f5d3ff9c40fa, runId = 73c071c6-e222-4760-a750-393666a298af] terminated with error
java.lang.ClassCastException: org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to org.apache.spark.sql.sources.v2.reader.streaming.Offset
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:405)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
20/04/09 18:38:44 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
My code is:
SQLContext sqlContext = new SQLContext(HadoopWork.sc);
sqlContext
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", IKafkaConstants.KAFKA_BROKERS)
.option("subscribe", IKafkaConstants.TOPIC_NAME)
.option("startingOffsets", "earliest")
.option("enable.auto.commit", false)
.option("checkpointLocation", "/tmp/checkpoint/1")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("parquet")
.outputMode("append")
.option("checkpointLocation", "/tmp/checkpoint/2")
.option("compression", "gzip")
.option("path", "/user/test/")
.option("enable.auto.commit", false)
.option("startingOffsets", "earliest")
.start();
答案1
得分: 0
这似乎是在结构化流处理中一个早已知的错误。
升级到 Spark 2.3.2+ 应该能解决这个问题。
英文:
It seems to be an older known bug in Structured Streaming.
Upgrading to Spark 2.3.2+ should do the trick for you
专注分享java语言的经验与见解,让所有开发者获益!
评论