UDF在使用外部变量时无法正常工作 [Java-spark]

huangapple 未分类评论47阅读模式
英文:

UDF not working with external variable [Java-spark]

问题

我正在尝试执行以下代码。

我正在将数据集作为输入,使用Groovyshell对其进行一些操作,并将该结果添加到数据集中的新列中。

private static Dataset<Row> addDebitCreditCol(SparkSession sparkSession, Dataset<Row> df, String code) {
    Script script = new GroovyShell().parse(code);
    UDF3 rulesExecutingUDF = (UDF3<Double, String, String, String>) (val1, val2, val3) -> {
        Binding binding = new Binding();
        binding.setVariable("val1", val1);
        binding.setVariable("val2", val2);
        binding.setVariable("val3", val3);
        Object value = script.run();
        return value.toString();
    };
    sparkSession.udf().register("rulesExecutingUDF", rulesExecutingUDF, DataTypes.StringType);
    df = df.withColumn("NEW_COL", callUDF("rulesExecutingUDF", col("val1"), col("val2"), col("val3")));
    return df;
}

当我执行此代码时,会抛出错误,提示:

用户类引发异常:org.apache.spark.SparkException:任务不可序列化。原因:java.io.NotSerializableException:Script1

看起来我不能在UDF内部使用外部变量(在这里是Script)。我还有哪些其他选择?

我尝试过一些方法来使这个外部变量以某种方式工作,但它没有成功,我无法找到一个可以替代UDF并完成这个任务的方法。

英文:

I am trying to execute the following code.

I am taking a dataset as input, doing some operation on it using Groovyshell and adding that result to the new column in dataset.

private static Dataset&lt;Row&gt; addDebitCreditCol(SparkSession sparkSession,Dataset&lt;Row&gt; df,String code){
        Script script=new GroovyShell().parse(code);
        UDF3 rulesExecutingUDF = (UDF3&lt;Double, String, String, String&gt;) (val1, val2, val3) -&gt; {
            Binding binding = new Binding();
            binding.setVariable(&quot;val1&quot;,val1);
            binding.setVariable(&quot;val2&quot;, val2);
            binding.setVariable(&quot;val3&quot;, val3);
            Object value = script.run();
            return value.toString();
        };
        sparkSession.udf().register(&quot;rulesExecutingUDF&quot;,rulesExecutingUDF,DataTypes.StringType);
        df=df.withColumn(&quot;NEW_COL&quot;,callUDF(&quot;rulesExecutingUDF&quot;,col(&quot;val1&quot;).,col(&quot;val2&quot;),col(&quot;val3&quot;)));
        return df;
    }

When I execute this code, it throws an error saying

> User class threw exception: org.apache.spark.SparkException: Task not serializable. Caused by: java.io.NotSerializableException: Script1

It looks like I can't use an external variable(which is Script here) inside a UDF. What other alternatives do I have?

I have tried getting this external variable to work somehow but it didn't worked, and I am unable to find an alternative to UDF which can do this.

答案1

得分: 0

一个简单的解决方法是使用实现了Serializable接口的脚本基类

public static abstract class BaseClass extends Script implements Serializable {

}

然后将该类用作你的脚本的超类:

CompilerConfiguration config = new CompilerConfiguration();
config.setScriptBaseClass("stackoverflow.Main.BaseClass"); // 使用你的完整类名
Script script = new GroovyShell(Main.class.getClassLoader(), config).parse(code);

这将使script成为一个Serializable实例。如果你测试script instanceof BaseClassscript instanceof Serializable,它们都将返回true

英文:

A simple way to solve this may be to use a script base class that implements Serializable itself.

public static abstract class BaseClass extends Script implements Serializable {

}

And then use that class as the superclass of your script:

CompilerConfiguration config = new CompilerConfiguration();
config.setScriptBaseClass(&quot;stackoverflow.Main.BaseClass&quot;); //use your FQCN
Script script = new GroovyShell(Main.class.getClassLoader(), config).parse(code);

This will make script a Serializable instance. If you test script instanceof BaseClass and script instanceof Serializable, they'll both return true.

huangapple
  • 本文由 发表于 2020年4月7日 12:22:34
  • 转载请务必保留本文链接:https://java.coder-hub.com/61072812.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定