testHarness在flink 1.8.2中无法被删除的计时器。

huangapple 未分类评论43阅读模式
英文:

testHarness timer not getting deleted in flink 1.8.2

问题

我正在使用testHarness来测试我的自定义触发器以下是简化的代码片段

```java
public class CustomTrigger extends Trigger<InputPOJO, TimeWindow> {

    private final ReducingStateDescriptor<Long> previousTriggerDesc = new ReducingStateDescriptor<>("previous-trigger", new Max(), LongSerializer.INSTANCE);

    private final long allowedLatenessMillis;

    public CustomTrigger(long allowedLatenessMillis) {
        this.allowedLatenessMillis = allowedLatenessMillis;
    }

    @Override
    public TriggerResult onElement(InputPOJO inputPOJO, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        ReducingState<Long> previousTriggerState = ctx.getPartitionedState(previousTriggerDesc);
        Long previousTriggerTime = ctx.getPartitionedState(previousTriggerDesc).get();

        if (previousTriggerTime != null) {
            ctx.deleteProcessingTimeTimer(previousTriggerTime);
            System.out.println("deleteProcessingTimeTimer(previousTriggerTime)" + previousTriggerTime);
        }

        long currentTriggerTime = ctx.getCurrentProcessingTime() + allowedLatenessMillis;
        ctx.registerProcessingTimeTimer(currentTriggerTime);

        previousTriggerState.add(currentTriggerTime);

        return TriggerResult.CONTINUE;
    }

    ...
}

在自定义触发器中,我为每个新的InputPOJO注册一个新的定时器。当我注册定时器时,我会删除先前的定时器(基于存储在减少状态中的previousTimerTriggerTime)。

我正在使用以下代码片段测试定时器计数(以及窗口):

private OneInputStreamOperatorTestHarness<InputPOJO, OutputPOJO> testHarness;
private CustomWindowFunction customWindowFunction;

@Before
public void setup_testHarness() throws Exception {
    // 设置 testHarness 的初始化代码
}

@Test
public void test_allowedLateness_extension_on_second_pojo() throws Exception {
    // 测试方法的代码
    Assert.assertEquals(2, testHarness.numProcessingTimeTimers()); // 失败
}

这里的函数ctx.deleteProcessingTimeTimer(previousTriggerTime);会被触发。但是,testHarness中的定时器计数仍然显示为3。

  1. 这是testHarness中的一个bug吗?

  2. 请提供一种使用testHarness测试定时器计数的方法。

注:

  1. 尽管这可能看起来像是SessionWindow.Gap()的典型功能,但我在复杂的计算中使用了这个自定义触发器。出于简化的目的,我将逻辑减少到了上述内容。

  2. 在为testHarness创建WindowOperator时,我使用了ListStateDescriptor



<details>
<summary>英文:</summary>

I am using testHarness to test my Custom Trigger. The simplified snippet is attached below:

```java
public class CustomTrigger extends Trigger&lt;InputPOJO, TimeWindow&gt; {

	private final ReducingStateDescriptor&lt;Long&gt; previousTriggerDesc = new ReducingStateDescriptor&lt;&gt;( &quot;previous-trigger&quot;, new Max(),LongSerializer.INSTANCE);

	private final long allowedLatenessMillis;


	public CustomTrigger(long allowedLatenessMillis) {
		this.allowedLatenessMillis = allowedLatenessMillis;
	}


	@Override
	public TriggerResult onElement(InputPOJO inputPOJO, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {

		ReducingState&lt;Long&gt; previousTriggerState = ctx.getPartitionedState(previousTriggerDesc);
		Long previousTriggerTime = ctx.getPartitionedState(previousTriggerDesc).get();

		// Remove previous Timer trigger. else it will invoke twice.
		if (previousTriggerTime != null) {
			ctx.deleteProcessingTimeTimer(previousTriggerTime);	//NOTE
	        System.out.println(&quot;deleteProcessingTimeTimer(previousTriggerTime)&quot;+previousTriggerTime); // Invoked
		}

		// register new trigger for current InputPOJO.		
		long currentTriggerTime = ctx.getCurrentProcessingTime() + allowedLatenessMillis;
		ctx.registerProcessingTimeTimer(currentTriggerTime);

		// Update currentTriggerTime in previousTriggerState.
		previousTriggerTimeState.add(currentTriggerTime);

		return TriggerResult.CONTINUE;
	}
	
	...
}

In the custom trigger, I am registering a new timer for every new InputPOJO. When I am registering the timer, I am deleting the previous timer (based on the previousTimerTriggerTime, saved in the reduced state).

I am testing the timer count (along with window), using the below snippet.

private OneInputStreamOperatorTestHarness&lt;InputPOJO, OutputPOJO&gt; testHarness;

private CustomWindowFunction customWindowFunction;

@Before
public void setup_testHarness() throws Exception {

	KeySelector&lt;InputPOJO, String&gt; keySelector = InputPOJO::getKey;

	TypeInformation&lt;InputPOJO&gt; STRING_INT_TUPLE = TypeInformation.of(new TypeHint&lt;InputPOJO&gt;() {}); // Any suggestion ?

	ListStateDescriptor&lt;InputPOJO&gt; stateDesc = new ListStateDescriptor&lt;&gt;(&quot;window-contents&quot;, STRING_INT_TUPLE.createSerializer(new ExecutionConfig())); // Any suggestion ?

	/**
	 * Creating windowOperator for the below function
	 *
	 * &lt;pre&gt;
	 *
	 *      DataStream&lt;OutputPOJO&gt; OutputPOJOStream =
	 *         inputPOJOStream
	 *             .keyBy(InputPOJO::getKey)
	 *             .window(ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)))
	 *             .trigger(new CustomTrigger(triggerAllowedLatenessMillis))
	 *             .process(new CustomWindowFunction(windowListStateTtlMillis));
	 * &lt;/pre&gt;
	 */
	customWindowFunction = new CustomWindowFunction(secondsToMillis(windowListStateTtlMillis));

	WindowOperator&lt;String, InputPOJO, Iterable&lt;InputPOJO&gt;, OutputPOJO, TimeWindow&gt;
	    operator =
	        new WindowOperator&lt;&gt;(
	            // setting .window(ProcessingTimeSessionWindows.withGap(maxTimeout))
	            ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)),
	            new TimeWindow.Serializer(),
	            // setting .keyBy(InputPOJO::getKey)
	            keySelector,
	            BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
	            stateDesc,
	            // setting  .process(new CustomWindowFunction(windowListStateTtlMillis))
	            new InternalIterableProcessWindowFunction&lt;&gt;(CustomWindowFunction),
	            // setting .trigger(new CustomTrigger(allowedLateness))
	            new CustomTrigger(secondsToMillis(allowedLatenessSeconds)),
	            0,
	            null);

	// Creating testHarness for window operator
	testHarness = new KeyedOneInputStreamOperatorTestHarness&lt;&gt;(operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO);

	// Setup and Open  Test Harness
	testHarness.setup();

	testHarness.open();
}


@Test
public void test_allowedLateness_extension_on_second_pojo() throws Exception {

	int allowedLatenessSeconds = 3;
	int listStateTTL = 10;

	//1. Arrange
	InputPOJO listStateInput1 = new InputPOJO(1,&quot;Arjun&quot;);
	InputPOJO listStateInput2 = new InputPOJO(2,&quot;Arun&quot;);


	// 2. Act
	// listStateInput1 comes at 1 sec
	testHarness.setProcessingTime(secondsToMillis(1));
	testHarness.processElement(new StreamRecord&lt;&gt;(listStateInput1));

	// listStateInput2 comes at 2 sec, ie in the allowedLateness period of listStateInput1
	testHarness.setProcessingTime(secondsToMillis(2));
	testHarness.processElement(new StreamRecord&lt;&gt;(listStateInput1));

	// Expectation : listStateInput2 deletes the existing untriggered timer of listStateInput1 and registers a new timer. 
	// Actual: listStateInput2 registered a new timer and the total count is 3.
	// NOTE: 
    // 1. Here I am using SessionWindow, so by default 1 timer would be registered for SessionGap.
    // 2. Second timer should be the InputPOJO registered timer.
	 Assert.assertEquals(2, testHarness.numProcessingTimeTimers()); // FAILS

}

Here, the functions, ctx.deleteProcessingTimeTimer(previousTriggerTime);, is getting triggered. But still the timerCount in testHarness is showing 3.

  1. Is it a bug in testHarness?

  2. Kindly provide an approach to test timer count with testHarness.

PS:

  1. Even though this might seem like a typical functionality of SessionWindow.Gap(), I am using this Custom Trigger in a complex calculation. For simplicity, I have reduced the logic to the above mentioned.

  2. I am using ListStateDescriptor when creating WindowOperator for testHarness.

huangapple
  • 本文由 发表于 2020年4月5日 18:40:14
  • 转载请务必保留本文链接:https://java.coder-hub.com/61041297.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定