英文:
testHarness timer not getting deleted in flink 1.8.2
问题
我正在使用testHarness来测试我的自定义触发器。以下是简化的代码片段:
```java
public class CustomTrigger extends Trigger<InputPOJO, TimeWindow> {
private final ReducingStateDescriptor<Long> previousTriggerDesc = new ReducingStateDescriptor<>("previous-trigger", new Max(), LongSerializer.INSTANCE);
private final long allowedLatenessMillis;
public CustomTrigger(long allowedLatenessMillis) {
this.allowedLatenessMillis = allowedLatenessMillis;
}
@Override
public TriggerResult onElement(InputPOJO inputPOJO, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ReducingState<Long> previousTriggerState = ctx.getPartitionedState(previousTriggerDesc);
Long previousTriggerTime = ctx.getPartitionedState(previousTriggerDesc).get();
if (previousTriggerTime != null) {
ctx.deleteProcessingTimeTimer(previousTriggerTime);
System.out.println("deleteProcessingTimeTimer(previousTriggerTime)" + previousTriggerTime);
}
long currentTriggerTime = ctx.getCurrentProcessingTime() + allowedLatenessMillis;
ctx.registerProcessingTimeTimer(currentTriggerTime);
previousTriggerState.add(currentTriggerTime);
return TriggerResult.CONTINUE;
}
...
}
在自定义触发器中,我为每个新的InputPOJO注册一个新的定时器。当我注册定时器时,我会删除先前的定时器(基于存储在减少状态中的previousTimerTriggerTime)。
我正在使用以下代码片段测试定时器计数(以及窗口):
private OneInputStreamOperatorTestHarness<InputPOJO, OutputPOJO> testHarness;
private CustomWindowFunction customWindowFunction;
@Before
public void setup_testHarness() throws Exception {
// 设置 testHarness 的初始化代码
}
@Test
public void test_allowedLateness_extension_on_second_pojo() throws Exception {
// 测试方法的代码
Assert.assertEquals(2, testHarness.numProcessingTimeTimers()); // 失败
}
这里的函数ctx.deleteProcessingTimeTimer(previousTriggerTime);
会被触发。但是,testHarness中的定时器计数仍然显示为3。
-
这是testHarness中的一个bug吗?
-
请提供一种使用testHarness测试定时器计数的方法。
注:
-
尽管这可能看起来像是SessionWindow.Gap()的典型功能,但我在复杂的计算中使用了这个自定义触发器。出于简化的目的,我将逻辑减少到了上述内容。
-
在为testHarness创建WindowOperator时,我使用了
ListStateDescriptor
。
<details>
<summary>英文:</summary>
I am using testHarness to test my Custom Trigger. The simplified snippet is attached below:
```java
public class CustomTrigger extends Trigger<InputPOJO, TimeWindow> {
private final ReducingStateDescriptor<Long> previousTriggerDesc = new ReducingStateDescriptor<>( "previous-trigger", new Max(),LongSerializer.INSTANCE);
private final long allowedLatenessMillis;
public CustomTrigger(long allowedLatenessMillis) {
this.allowedLatenessMillis = allowedLatenessMillis;
}
@Override
public TriggerResult onElement(InputPOJO inputPOJO, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ReducingState<Long> previousTriggerState = ctx.getPartitionedState(previousTriggerDesc);
Long previousTriggerTime = ctx.getPartitionedState(previousTriggerDesc).get();
// Remove previous Timer trigger. else it will invoke twice.
if (previousTriggerTime != null) {
ctx.deleteProcessingTimeTimer(previousTriggerTime); //NOTE
System.out.println("deleteProcessingTimeTimer(previousTriggerTime)"+previousTriggerTime); // Invoked
}
// register new trigger for current InputPOJO.
long currentTriggerTime = ctx.getCurrentProcessingTime() + allowedLatenessMillis;
ctx.registerProcessingTimeTimer(currentTriggerTime);
// Update currentTriggerTime in previousTriggerState.
previousTriggerTimeState.add(currentTriggerTime);
return TriggerResult.CONTINUE;
}
...
}
In the custom trigger, I am registering a new timer for every new InputPOJO. When I am registering the timer, I am deleting the previous timer (based on the previousTimerTriggerTime, saved in the reduced state).
I am testing the timer count (along with window), using the below snippet.
private OneInputStreamOperatorTestHarness<InputPOJO, OutputPOJO> testHarness;
private CustomWindowFunction customWindowFunction;
@Before
public void setup_testHarness() throws Exception {
KeySelector<InputPOJO, String> keySelector = InputPOJO::getKey;
TypeInformation<InputPOJO> STRING_INT_TUPLE = TypeInformation.of(new TypeHint<InputPOJO>() {}); // Any suggestion ?
ListStateDescriptor<InputPOJO> stateDesc = new ListStateDescriptor<>("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())); // Any suggestion ?
/**
* Creating windowOperator for the below function
*
* <pre>
*
* DataStream<OutputPOJO> OutputPOJOStream =
* inputPOJOStream
* .keyBy(InputPOJO::getKey)
* .window(ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)))
* .trigger(new CustomTrigger(triggerAllowedLatenessMillis))
* .process(new CustomWindowFunction(windowListStateTtlMillis));
* </pre>
*/
customWindowFunction = new CustomWindowFunction(secondsToMillis(windowListStateTtlMillis));
WindowOperator<String, InputPOJO, Iterable<InputPOJO>, OutputPOJO, TimeWindow>
operator =
new WindowOperator<>(
// setting .window(ProcessingTimeSessionWindows.withGap(maxTimeout))
ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)),
new TimeWindow.Serializer(),
// setting .keyBy(InputPOJO::getKey)
keySelector,
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
// setting .process(new CustomWindowFunction(windowListStateTtlMillis))
new InternalIterableProcessWindowFunction<>(CustomWindowFunction),
// setting .trigger(new CustomTrigger(allowedLateness))
new CustomTrigger(secondsToMillis(allowedLatenessSeconds)),
0,
null);
// Creating testHarness for window operator
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO);
// Setup and Open Test Harness
testHarness.setup();
testHarness.open();
}
@Test
public void test_allowedLateness_extension_on_second_pojo() throws Exception {
int allowedLatenessSeconds = 3;
int listStateTTL = 10;
//1. Arrange
InputPOJO listStateInput1 = new InputPOJO(1,"Arjun");
InputPOJO listStateInput2 = new InputPOJO(2,"Arun");
// 2. Act
// listStateInput1 comes at 1 sec
testHarness.setProcessingTime(secondsToMillis(1));
testHarness.processElement(new StreamRecord<>(listStateInput1));
// listStateInput2 comes at 2 sec, ie in the allowedLateness period of listStateInput1
testHarness.setProcessingTime(secondsToMillis(2));
testHarness.processElement(new StreamRecord<>(listStateInput1));
// Expectation : listStateInput2 deletes the existing untriggered timer of listStateInput1 and registers a new timer.
// Actual: listStateInput2 registered a new timer and the total count is 3.
// NOTE:
// 1. Here I am using SessionWindow, so by default 1 timer would be registered for SessionGap.
// 2. Second timer should be the InputPOJO registered timer.
Assert.assertEquals(2, testHarness.numProcessingTimeTimers()); // FAILS
}
Here, the functions, ctx.deleteProcessingTimeTimer(previousTriggerTime);
, is getting triggered. But still the timerCount in testHarness is showing 3.
-
Is it a bug in testHarness?
-
Kindly provide an approach to test timer count with testHarness.
PS:
-
Even though this might seem like a typical functionality of SessionWindow.Gap(), I am using this Custom Trigger in a complex calculation. For simplicity, I have reduced the logic to the above mentioned.
-
I am using
ListStateDescriptor
when creatingWindowOperator
for testHarness.
专注分享java语言的经验与见解,让所有开发者获益!
评论