当使用readStream().format(“kafka”)时,我遇到了一个问题。

huangapple 未分类评论42阅读模式
英文:

I have a problem when working with readStream().format("kafka")

问题

请帮忙修复错误:

错误信息:

20/04/09 18:38:44 ERROR MicroBatchExecution: 查询 [id = 9f3cbbf6-85a8-4aed-89c6-f5d3ff9c40fa, runId = 73c071c6-e222-4760-a750-393666a298af] 因错误而终止
java.lang.ClassCastException: org.apache.spark.sql.execution.streaming.SerializedOffset 无法转换为 org.apache.spark.sql.sources.v2.reader.streaming.Offset
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:405)
        ...
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        ...
20/04/09 18:38:44 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint 已停止!

代码部分:

SQLContext sqlContext = new SQLContext(HadoopWork.sc);
sqlContext
    .readStream()
    .format("kafka")
    .option("kafka.bootstrap.servers", IKafkaConstants.KAFKA_BROKERS)
    .option("subscribe", IKafkaConstants.TOPIC_NAME)
    .option("startingOffsets",  "earliest")
    .option("enable.auto.commit", false)
    .option("checkpointLocation", "/tmp/checkpoint/1")
    .load()
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    .writeStream()
    .format("parquet")
    .outputMode("append")
    .option("checkpointLocation", "/tmp/checkpoint/2")
    .option("compression", "gzip")
    .option("path", "/user/test/")
    .option("enable.auto.commit", false)
    .option("startingOffsets",  "earliest")
    .start();

请注意,上述内容是您提供的内容的中文翻译,不包括代码部分以外的任何其他回答。

英文:

Help please fix the error:

20/04/09 18:38:44 ERROR MicroBatchExecution: Query [id = 9f3cbbf6-85a8-4aed-89c6-f5d3ff9c40fa, runId = 73c071c6-e222-4760-a750-393666a298af] terminated with error
java.lang.ClassCastException: org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to org.apache.spark.sql.sources.v2.reader.streaming.Offset
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:405)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
20/04/09 18:38:44 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

My code is:

   SQLContext sqlContext = new SQLContext(HadoopWork.sc);
    sqlContext
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", IKafkaConstants.KAFKA_BROKERS)
            .option("subscribe", IKafkaConstants.TOPIC_NAME)
            .option("startingOffsets",  "earliest")
            .option("enable.auto.commit", false)
            .option("checkpointLocation", "/tmp/checkpoint/1")
            .load()
            .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
            .writeStream()
            .format("parquet")
            .outputMode("append")
            .option("checkpointLocation", "/tmp/checkpoint/2")
            .option("compression", "gzip")
            .option("path", "/user/test/")
            .option("enable.auto.commit", false)
            .option("startingOffsets",  "earliest")
            .start();

答案1

得分: 0

这似乎是在结构化流处理中一个早已知的错误


升级到 Spark 2.3.2+ 应该能解决这个问题。

英文:

It seems to be an older known bug in Structured Streaming.


Upgrading to Spark 2.3.2+ should do the trick for you

huangapple
  • 本文由 发表于 2020年4月9日 23:57:35
  • 转载请务必保留本文链接:https://java.coder-hub.com/61125231.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定