使用sparksession.sql导致“任务不可序列化”错误。

huangapple 未分类评论45阅读模式
英文:

Using sparksession.sql leading to Task not serializable error

问题

SparkSession sparkSession = SparkSession
                .builder()
                .appName("test")
//                .master("local")
                .enableHiveSupport()
                .getOrCreate();

Dataset<Row> df = sparkSession.sql("## My SQL query to HIVE here ##").toDF();
Dataset<Row> rulesDataset = sparkSession.sql("select * from rules.md");
String code = rulesDataset.collectAsList().get(0).get(0).toString();
df.show(false);
Script script = new GroovyShell().parse(code);

UDF3 rulesExecutingUDF = new UDF3<Double, String, String, String>() {
    @Override
    public String call(Double val1, String val2, String val3) throws Exception {
        Binding binding = new Binding();
        binding.setVariable("VAL1", val1);
        binding.setVariable("VAL2", val2);
        binding.setVariable("VAL3", val3);
        script.setBinding(binding);
        Object value = script.run();
        return value.toString();
    }
};
sparkSession.udf().register("rulesExecutingUDF", rulesExecutingUDF, DataTypes.StringType);
df = df.withColumn("NEW_COL", callUDF("rulesExecutingUDF", col("VAL1"), col("VAL2"), col("VAL3")));
df.show();

Debugger message:

diagnostics: User class threw exception: org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
...

Caused by: java.io.NotSerializableException: Script1
Serialization stack:
- object not serializable (class: Script1, value: Script1@121b5eab)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.test2.FeefactMD, functionalInterfaceMethod=org/apache/spark/sql/api/java/UDF3.call:(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic com/test2/FeefactMD.lambda$main$c068ede9$1:(Lgroovy/lang/Script;Ljava/lang/Double;Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;, instantiatedMethodType=(Ljava/lang/Double;Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;, numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class com.test2.FeefactMD$$Lambda$61/1143680308, com.test2.FeefactMD$$Lambda$61/1143680308@1fe9c374)
- field (class: org.apache.spark.sql.UDFRegistration$$anonfun$30, name: f$20, type: interface org.apache.spark.sql.api.java.UDF3)
- object (class org.apache.spark.sql.UDFRegistration$$anonfun$30, )
- element of array (index: 5)
- array (class [Ljava.lang.Object;, size 6)
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, name: references$1, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, )
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)


<details>
<summary>英文:</summary>

I am trying to execute the following code:-

SparkSession sparkSession = SparkSession
.builder()
.appName("test")
// .master("local")
.enableHiveSupport()
.getOrCreate();

    Dataset&lt;Row&gt; df = sparkSession.sql(&quot;## My SQL query to HIVE here ##&quot;).toDF();
    Dataset&lt;Row&gt; rulesDataset = sparkSession.sql(&quot;select * from rules.md&quot;);
    String code = rulesDataset.collectAsList().get(0).get(0).toString();
    df.show(false);
    Script script=new GroovyShell().parse(code);

    UDF3 rulesExecutingUDF = new UDF3&lt;Double,String,String, String&gt;() {
        @Override
        public String call(Double val1,String val2,String val3) throws Exception {
            Binding binding = new Binding();
            binding.setVariable(&quot;VAL1&quot;,val1);
            binding.setVariable(&quot;VAL2&quot;, val2);
            binding.setVariable(&quot;VAL3&quot;, val3);
            script.setBinding(binding);
            Object value = script.run();
            return value.toString();
        }
    };
    sparkSession.udf().register(&quot;rulesExecutingUDF&quot;,rulesExecutingUDF, DataTypes.StringType);
    df=df.withColumn(&quot;NEW_COL&quot;,callUDF(&quot;rulesExecutingUDF&quot;,col(&quot;VAL1&quot;),col(&quot;VAL2&quot;),col(&quot;VAL3&quot;)));
    df.show();

The issue is I am getting a serialization error here, saying task not serializable. I did a lot of trial and error, and found that the statement

```Dataset&lt;Row&gt; df = sparkSession.sql(&quot;## My SQL query to HIVE here ##&quot;).toDF();```

might have something to do with this. I am getting this dataset from the hive table from the server. 

What I did was I prepared a similar dataset with similar schema and instead of that query, if I use a hardcoded variable like

StructField[] structFields = new StructField[]{
new StructField("VAL1", DataTypes.DoubleType, true, Metadata.empty()),
new StructField("VAL2", DataTypes.StringType, true, Metadata.empty()),
new StructField("VAL3", DataTypes.StringType, true, Metadata.empty())
};

    StructType structType = new StructType(structFields);

    List&lt;Row&gt; rows = new ArrayList&lt;&gt;();
    rows.add(RowFactory.create(160.0,&quot;X&quot;,&quot;I&quot;));
    rows.add(RowFactory.create(200.0,&quot;D&quot;,&quot;C&quot;));
    Dataset&lt;Row&gt; df = sparkSession.createDataFrame(rows, structType);

then I am not getting the serializable error and the spark job successfully executes. 

The schema of the datasets created both ways is same and the values also I got from the hive table. I am unable to find why this is happening. Can someone help me out here?

**Debugger message**

&gt; diagnostics: User class threw exception: org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2304)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:608)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:337)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2489)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2703)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.show(Dataset.scala:725)
	at com.test2.FeefactMD.main(FeefactMD.java:65)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
Caused by: java.io.NotSerializableException: Script1
Serialization stack:
	- object not serializable (class: Script1, value: Script1@121b5eab)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.test2.FeefactMD, functionalInterfaceMethod=org/apache/spark/sql/api/java/UDF3.call:(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic com/test2/FeefactMD.lambda$main$c068ede9$1:(Lgroovy/lang/Script;Ljava/lang/Double;Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;, instantiatedMethodType=(Ljava/lang/Double;Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;, numCaptured=1])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class com.test2.FeefactMD$$Lambda$61/1143680308, com.test2.FeefactMD$$Lambda$61/1143680308@1fe9c374)
	- field (class: org.apache.spark.sql.UDFRegistration$$anonfun$30, name: f$20, type: interface org.apache.spark.sql.api.java.UDF3)
	- object (class org.apache.spark.sql.UDFRegistration$$anonfun$30, &lt;function3&gt;)
	- element of array (index: 5)
	- array (class [Ljava.lang.Object;, size 6)
	- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, name: references$1, type: class [Ljava.lang.Object;)
	- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, &lt;function2&gt;)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)




</details>


huangapple
  • 本文由 发表于 2020年4月8日 03:00:21
  • 转载请务必保留本文链接:https://java.coder-hub.com/61087453.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定