Flink 1.9.0 – 更改状态对象后状态反序列化失败

huangapple 未分类评论47阅读模式
英文:

Flink 1.9.0 - State deserialization fails after changing the state object

问题

更改状态对象后,我们的状态无法加载。(不出所料)

我们的状态对象大致如下:

public class StateHolder {
    private Set<Object1> objects1 = new HashSet<>();
    private Set<Object2> objects2 = new HashSet<>();
    // 还有4组对象,称为Object3到Object6
    // 无参构造函数,以及getter和setter方法
...
}

它的使用方式如下:

ValueStateDescriptor<StateHolder> aggregateValueStateDescriptor = new ValueStateDescriptor<>(
                getDescriptorNamePrefix(STATE_PREFIX, STATE_NAME_COMMAND_AGGREGATE, DATE_OF_STATE_CREATION),
                TypeInformation.of(new TypeHint<StateHolder>() {
                })
        );
commandAggregateState = getRuntimeContext().getState(aggregateValueStateDescriptor);

最近,我们向Object3中添加了字段。我们将默认值设置为字符串,定义如下:private String newField = "";

这样做后,我们得到了以下异常:

com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 104
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:90)

显然,我们的集合正在使用KryoSerializer进行反序列化,我猜这是可以预料的。但可能不是理想的情况。

我不明白的是为什么会出现这种失败,以及如何解决这个问题?所以我们向Object3类添加了字段,但反序列化失败的是objects5字段(Set<Object5>)。新字段是否可能导致缓冲区意外移位,从而从错误位置读取了注册ID?

我们的POJO符合文档中列出的所有规则(也许这可能与此无关,因为失败的是Kryo反序列化):

* 该类是公共的且独立的(没有非静态内部类)
* 该类具有公共无参数构造函数
* 该类(以及所有超类)中的所有非静态、非瞬态字段要么是公共的(且非final),要么具有遵循Java Beans命名约定的公共getter和setter方法。

通过谷歌搜索异常,有一些建议,比如实现自定义的Serializer。您是否同意这是正确的方法?如果是这样,通过谷歌搜索,我只找到了如何创建自定义Avro和Protobuff序列化器的方法,但我们没有使用它们来保留状态。显然,我们的数据是使用PojoSerializer序列化的,它使用不同的KryoSerializer来反序列化我们的集合,其中一个显然失败了。我们应该为这个失败的字段创建一个新的自定义PojoSerializer吗?如果是这样,这个自定义KryoSerializer应该是什么样子的?我们是否应该有一个硬编码的注册ID,符合java.util.Set

感谢您阅读所有这些内容,如果这是重复的问题,我表示歉意。我找不到足够相似的问题。

英文:

After changing the state object, our state fails to load. (not surprising)

Our state object looks something like this:

public class StateHolder {
    private Set&lt;Object1&gt; objects1 = new HashSet&lt;&gt;();
    private Set&lt;Object2&gt; objects2 = new HashSet&lt;&gt;();
    // 4 more sets of objects, Object3 to Object6 let&#39;s call them that
    // no args constructor, and getters and setters
...
}

And it is used like this:

ValueStateDescriptor&lt;StateHolder&gt; aggregateValueStateDescriptor = new ValueStateDescriptor&lt;&gt;(
                getDescriptorNamePrefix(STATE_PREFIX, STATE_NAME_COMMAND_AGGREGATE, DATE_OF_STATE_CREATION),
                TypeInformation.of(new TypeHint&lt;StateHolder&gt;() {
                })
        );
        commandAggregateState = getRuntimeContext().getState(aggregateValueStateDescriptor);

Recently, we added the field to Object3. We set the default value to it. It is a string and it's defined as : private String newField = &quot;&quot;;

After doing so we are getting the following exception:

com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 104
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:90)

Obviously, our Sets are being deserialized using KryoSerializer and I guess that was something to be expected. Presumably not ideal though.

What I am failing to understand is why exactly is this failing and how to tackle this issue? So we added field to Object3 class, but deserialization fails on objects5 field (Set&lt;Object5&gt;). Could new field cause buffer to shift unexpectedly so that Registration ID was read from wrong position?

Our POJO conforms to all the rules listed in documentation (Maybe this might not be relevant since it's Kryo deserialization that fails):

* The class is public and standalone (no non-static inner class)
* The class has a public no-argument constructor
* All non-static, non-transient fields in the class (and all superclasses) are either public (and non-final) or have a public getter- and a setter- method that follows the Java beans naming conventions for getters and setters.

By googling the exception, there were some suggestions like implementing custom Serializer.
Would you agree that this is the right approach to take? If so, by googling, I only found out how to do custom Avro and Protobuff serializers, neither of which we are using to preserve state. Seemingly, our data is serialized with PojoSerializer which uses different KryoSerializers for deserializing our Sets, and one of them is obviously failing. Should we create new custom PojoSerializer with a custom KryoSerializer for this failing field, and if so, how is that custom KryoSerializer supposed to look like? Should we have hard coded registration id which conforms to java.util.Set?

Thank you for reading all this, and I apologize if this is the duplicate question. I couldn't find one similiar enough to this one.

UPDATE

So, after suggestion to move new field initialization to default value to NoArgsConstructor, our original exception is just being wrapped.

java.lang.RuntimeException: Error while getting state
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
at fake.package.pipeline.handler.codebook.function.CommandHandlerCodeBookFunction.initializeState(CommandHandlerCodeBookFunction.java:88)
at fake.package.pipeline.handler.Handler.open(Handler.java:104)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.open(KeyedCoProcessOperator.java:62)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: Error while trying to migrate RocksDB state.
at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.migrateSerializedValue(AbstractRocksDBState.java:213)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:603)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:532)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 11 more
Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 104
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.migrateSerializedValue(AbstractRocksDBState.java:210)
... 21 more

huangapple
  • 本文由 发表于 2020年4月9日 18:09:33
  • 转载请务必保留本文链接:https://java.coder-hub.com/61118750.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定