不完整/不一致状态在RichCoFlatMapFunction中。

huangapple 未分类评论72阅读模式
英文:

Incomplete/inconsistent state in RichCoFlatMapFunction

问题

以下是您提供的代码的翻译:

public final class OrganizationFilter
    extends RichCoFlatMapFunction<
        Threshold,
        Point,
        Point
    > {

    private transient MapState<String, ArrayList<Operand>> operands;

    private void setOperandsState(Threshold threshold) throws Exception {

        HashMap<String, ArrayList<Operand>> newState = new HashMap<String, ArrayList<Operand>>();
        // 在此处设置 newState。

        operands.clear();
        operands.putAll(newState);
    }
    
    @Override
    public void open(Configuration config) {
        MapStateDescriptor<String, ArrayList<Operand>> operandsDescriptor =
            new MapStateDescriptor<>(
                "operands",
                BasicTypeInfo.STRING_TYPE_INFO,
                TypeInformation.of(new TypeHint<ArrayList<Operand>>() {})
            );

        operands = getRuntimeContext().getMapState(operandsDescriptor);
    }
    
    @Override
    public void flatMap1(Threshold threshold, Collector<Point> out) throws Exception {
        setOperandsState(threshold);

        // 遍历 MapState 内容。显示完整元素。

    }
    
    @Override
    public void flatMap2(Point in, Collector<Point> out) throws Exception {
        // 遍历 MapState 内容。显示不完整元素。
       	
    }
}

请注意,这是您提供的代码的直译中文翻译。如果您对代码的功能或结构有任何疑问,请随时提问。

英文:

I have a RichCoFlatMapFunction that looks like below. I store state in a MapState&lt;String, ArrayList&lt;Operand&gt;&gt;. Operand is a POJO.

I set the state in flatMap1 function and access it at flatMap2. However, I noticed that when I try to print out the content of state from flatMap2, it does not show all of the elements of the MapState.

In flatMap1, where I set the state, I also printed out the content of state right after setOperandState and it shows all the elements unlike in flatMap2.

Note that I tested it with continuous stream. Also, I ran the code on a single key (i.e. datastream results to only one partition). Am I missing something here?

public final class OrganizationFilter
    extends RichCoFlatMapFunction&lt;
        Threshold,
        Point,
        Point
    &gt; {

    private transient MapState&lt;String, ArrayList&lt;Operand&gt;&gt; operands;

    private void setOperandsState(Threshold threshold) throws Exception {

        HashMap&lt;String, ArrayList&lt;Operand&gt;&gt; newState = new HashMap&lt;String, ArrayList&lt;Operand&gt;&gt;();
        // set newState here.

        operands.clear();
        operands.putAll(newState);
    }
    
    @Override
    public void open(Configuration config) {
        MapStateDescriptor&lt;String, ArrayList&lt;Operand&gt;&gt; operandsDescriptor =
            new MapStateDescriptor&lt;&gt;(
                &quot;operands&quot;,
                BasicTypeInfo.STRING_TYPE_INFO,
                TypeInformation.of(new TypeHint&lt;ArrayList&lt;Operand&gt;&gt;() {} )
            );

        operands = getRuntimeContext().getMapState(operandsDescriptor);
    }
    
    @Override
    public void flatMap1(Threshold threshold, Collector&lt;Point&gt; out) throws Exception {
        setOperandsState(threshold);

        // Iterated through MapState content. Shows complete elements.

    }
    
    @Override
    public void flatMap2(Point in, Collector&lt;Point&gt; out) throws Exception {
        // Iterated through MapState contents. Shows incomplete elements
       	
    }
}

huangapple
  • 本文由 发表于 2020年7月25日 06:43:33
  • 转载请务必保留本文链接:https://java.coder-hub.com/63082343.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定