英文:
Incomplete/inconsistent state in RichCoFlatMapFunction
问题
以下是您提供的代码的翻译:
public final class OrganizationFilter
extends RichCoFlatMapFunction<
Threshold,
Point,
Point
> {
private transient MapState<String, ArrayList<Operand>> operands;
private void setOperandsState(Threshold threshold) throws Exception {
HashMap<String, ArrayList<Operand>> newState = new HashMap<String, ArrayList<Operand>>();
// 在此处设置 newState。
operands.clear();
operands.putAll(newState);
}
@Override
public void open(Configuration config) {
MapStateDescriptor<String, ArrayList<Operand>> operandsDescriptor =
new MapStateDescriptor<>(
"operands",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<ArrayList<Operand>>() {})
);
operands = getRuntimeContext().getMapState(operandsDescriptor);
}
@Override
public void flatMap1(Threshold threshold, Collector<Point> out) throws Exception {
setOperandsState(threshold);
// 遍历 MapState 内容。显示完整元素。
}
@Override
public void flatMap2(Point in, Collector<Point> out) throws Exception {
// 遍历 MapState 内容。显示不完整元素。
}
}
请注意,这是您提供的代码的直译中文翻译。如果您对代码的功能或结构有任何疑问,请随时提问。
英文:
I have a RichCoFlatMapFunction that looks like below. I store state in a MapState<String, ArrayList<Operand>>
. Operand is a POJO.
I set the state in flatMap1 function and access it at flatMap2. However, I noticed that when I try to print out the content of state from flatMap2, it does not show all of the elements of the MapState.
In flatMap1, where I set the state, I also printed out the content of state right after setOperandState and it shows all the elements unlike in flatMap2.
Note that I tested it with continuous stream. Also, I ran the code on a single key (i.e. datastream results to only one partition). Am I missing something here?
public final class OrganizationFilter
extends RichCoFlatMapFunction<
Threshold,
Point,
Point
> {
private transient MapState<String, ArrayList<Operand>> operands;
private void setOperandsState(Threshold threshold) throws Exception {
HashMap<String, ArrayList<Operand>> newState = new HashMap<String, ArrayList<Operand>>();
// set newState here.
operands.clear();
operands.putAll(newState);
}
@Override
public void open(Configuration config) {
MapStateDescriptor<String, ArrayList<Operand>> operandsDescriptor =
new MapStateDescriptor<>(
"operands",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<ArrayList<Operand>>() {} )
);
operands = getRuntimeContext().getMapState(operandsDescriptor);
}
@Override
public void flatMap1(Threshold threshold, Collector<Point> out) throws Exception {
setOperandsState(threshold);
// Iterated through MapState content. Shows complete elements.
}
@Override
public void flatMap2(Point in, Collector<Point> out) throws Exception {
// Iterated through MapState contents. Shows incomplete elements
}
}
专注分享java语言的经验与见解,让所有开发者获益!
评论