testHarness ListState TTL在Flink 1.8.2上未生效

huangapple 未分类评论45阅读模式
英文:

testHarness ListState TTL not getting applied on Flink 1.8.2

问题

以下是您要翻译的内容:

public class CustomWindowFunction extends ProcessWindowFunction<InputPOJO, OutputPOJO, String, TimeWindow> {

  ...

  @Override
  public void open(Configuration config) {
    StateTtlConfig ttlConfig =
        StateTtlConfig.newBuilder(listStateTTl)
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .build();
    listStateDescriptor =  new ListStateDescriptor<>("unprocessedItems", InputPOJO.class);
    listStateDescriptor.enableTimeToLive(ttlConfig);
  }

  @Override
  public void process( String key, Context context, Iterable<InputPOJO> windowElements, Collector<OutputPOJO> out) throws Exception {
    ListState<InputPOJO> listState = getRuntimeContext().getListState(listStateDescriptor);
    
    ....

    Iterator<InputPOJO> iterator;

    iterator = listState.get().iterator();
    while (iterator.hasNext()) {
      InputPOJO listStateInput = iterator.next();
      System.out.println("There are unexpired elements in listState");

      /** Business Logic to compute result using the unexpired values in listState**/
    }

    /** Business Logic to compute result using the current window elements.*/

    iterator = windowElements.iterator();
    while (iterator.hasNext()) {
      System.out.println("unProcessed Item added to ListState.")
      InputPOJO unprocessedItem = iterator.next();
      listState.add(unprocessedItem);
    }
  }
  ....
}
private OneInputStreamOperatorTestHarness<InputPOJO, OutputPOJO> testHarness;

private CustomWindowFunction customWindowFunction;

@Before
public void setup_testHarness() throws Exception {

    ...

    customWindowFunction = new CustomWindowFunction(secondsToMillis(windowListStateTtlMillis));

    WindowOperator<String, InputPOJO, Iterable<InputPOJO>, OutputPOJO, TimeWindow>
        operator =
            new WindowOperator<>(
                ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)),
                new TimeWindow.Serializer(),
                keySelector,
                BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                stateDesc,
                new InternalIterableProcessWindowFunction<>(customWindowFunction),
                new CustomTrigger(secondsToMillis(allowedLatenessSeconds)),
                0,
                null);

    testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO);

    testHarness.setup();
    testHarness.open();
}

@Test
public void test_listStateTtl_exclusion() throws Exception {

  int allowedLatenessSeconds = 3;
  int listStateTTL = 10;

  ...

  testHarness.setProcessingTime(secondsToMillis(1));
  testHarness.processElement(new StreamRecord<>(listStateInput1));

  ...

  testHarness.setProcessingTime(secondsToMillis(4));

  ...

  testHarness.setProcessingTime(secondsToMillis(15));

  ...

  testHarness.setProcessingTime(secondsToMillis(16));
  testHarness.processElement(new StreamRecord<>(listStateInput2));

  ...

  testHarness.setProcessingTime(secondsToMillis(20));

  ...

  List<StreamRecord<? extends T>> streamRecords = testHarness.extractOutputStreamRecords();
}

我注意到设置处理时间时未应用TTL。当我尝试使用Thread.sleep(TTL)执行相同的函数时,结果是符合预期的。

  1. 列表状态(listState)的TTL是否使用系统时间进行清除(使用testHarness)?

  2. 是否有办法使用testHarness测试listState的TTL?

英文:

I am testing a window function which has a listState, with TTL enabled.

Snippet of window function:

public class CustomWindowFunction extends ProcessWindowFunction&lt;InputPOJO, OutputPOJO, String, TimeWindow&gt; {

  ...
  
  @Override
  public void open(Configuration config) {
    StateTtlConfig ttlConfig =
        StateTtlConfig.newBuilder(listStateTTl)
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // NOTE: NeverReturnExpired
            .build();
    listStateDescriptor =  new ListStateDescriptor&lt;&gt;(&quot;unprocessedItems&quot;, InputPOJO.class);
    listStateDescriptor.enableTimeToLive(ttlConfig);
  }

  @Override
  public void process( String key, Context context, Iterable&lt;InputPOJO&gt; windowElements, Collector&lt;OutputPOJO&gt; out) throws Exception {

    	ListState&lt;InputPOJO&gt; listState = getRuntimeContext().getListState(listStateDescriptor);

    	....

		Iterator&lt;InputPOJO&gt; iterator;

    	// Getting unexpired listStateItems for computation.
	    iterator = listState.get().iterator();
	    while (iterator.hasNext()) {
	    	InputPOJO listStateInput = iterator.next();
	    	System.out.println(&quot;There are unexpired elements in listState&quot;);

	      	/** Business Logic to compute result using the unexpired values in listState**/
	    }

		
		/** Business Logic to compute result using the current window elements.*/

	    // Adding unProcessed WindowElements to ListState(with TTL)
	    // NOTE: processed WindowElements are removed manually.
      	iterator = windowElements.iterator();
	    while (iterator.hasNext()) {
      		System.out.println(&quot;unProcessed Item added to ListState.&quot;)
            InputPOJO unprocessedItem = iterator.next();
      	 	listState.add(unprocessedItem); // This part gets executed for listStateInput1
      	}

   	}
   	....
}

I am using testHarness to perform the integration test. I am testing the listState item count when the TTL for the listState is expired. Below is my test function snippet.

NOTE:

  1. There is a custom allowedLateness which is implemented using a custom Timer.
private OneInputStreamOperatorTestHarness&lt;InputPOJO, OutputPOJO&gt; testHarness;

private CustomWindowFunction customWindowFunction;

@Before
public void setup_testHarness() throws Exception {

    KeySelector&lt;InputPOJO, String&gt; keySelector = InputPOJO::getKey;

    TypeInformation&lt;InputPOJO&gt; STRING_INT_TUPLE = TypeInformation.of(new TypeHint&lt;InputPOJO&gt;() {}); // Any suggestion ?

    ListStateDescriptor&lt;InputPOJO&gt; stateDesc = new ListStateDescriptor&lt;&gt;(&quot;window-contents&quot;, STRING_INT_TUPLE.createSerializer(new ExecutionConfig())); // Any suggestion ?

    /**
     * Creating windowOperator for the below function
     *
     * &lt;pre&gt;
     *
     *      DataStream&lt;OutputPOJO&gt; OutputPOJOStream =
     *         inputPOJOStream
     *             .keyBy(InputPOJO::getKey)
     *             .window(ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)))
     *             .trigger(new CustomTrigger(triggerAllowedLatenessMillis))
     *             .process(new CustomWindowFunction(windowListStateTtlMillis));
     * &lt;/pre&gt;
     */
    customWindowFunction = new CustomWindowFunction(secondsToMillis(windowListStateTtlMillis));

    WindowOperator&lt;String, InputPOJO, Iterable&lt;InputPOJO&gt;, OutputPOJO, TimeWindow&gt;
        operator =
            new WindowOperator&lt;&gt;(
                // setting .window(ProcessingTimeSessionWindows.withGap(maxTimeout))
                ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)),
                new TimeWindow.Serializer(),
                // setting .keyBy(InputPOJO::getKey)
                keySelector,
                BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                stateDesc,
                // setting  .process(new CustomWindowFunction(windowListStateTtlMillis))
                new InternalIterableProcessWindowFunction&lt;&gt;(customWindowFunction),
                // setting .trigger(new CustomTrigger(allowedLateness))
                new CustomTrigger(secondsToMillis(allowedLatenessSeconds)),
                0,
                null);

    // Creating testHarness for window operator
    testHarness = new KeyedOneInputStreamOperatorTestHarness&lt;&gt;(operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO);

    // Setup and Open  Test Harness
    testHarness.setup();

    testHarness.open();
}

@Test
public void test_listStateTtl_exclusion() throws Exception {

	int allowedLatenessSeconds = 3;
	int listStateTTL = 10;

	//1. Arrange
	InputPOJO listStateInput1 = new InputPOJO(1,&quot;Arjun&quot;);
	InputPOJO listStateInput2 = new InputPOJO(2,&quot;Arun&quot;);


	// 2. Act
	// listStateInput1 comes at 1 sec
	testHarness.setProcessingTime(secondsToMillis(1));
	testHarness.processElement(new StreamRecord&lt;&gt;(listStateInput1));


	// Setting current processing time to  1 + 3 = 4 &gt; allowedLateness.
	// Window.process() is called, and window is purged (FIRE_AND_PURGE)
	// Expectation: listStateInput1 is put into listState with TTL (10 secs), before process() ends.
	testHarness.setProcessingTime(secondsToMillis(4));

	// Setting processing time after listStateTTL, ie 4 + listStateTTL(10) + 1 = 15
	// Expectation: listStateInput1 is evicted from the listState  (Fails)
	testHarness.setProcessingTime(secondsToMillis(15));

	// Using sleep(), the listStateTTL is getting applied to listState and listStateInput1 is evicted (Pass)
	//Thread.sleep(secondsToMillis(15))

	//Passing listStateInput2 to the test Harness
	testHarness.setProcessingTime(secondsToMillis(16));
	testHarness.processElement(new StreamRecord&lt;&gt;(listStateInput2));


	// Setting processing time after allowedLateness = 16 + 3 + 1 = 20
	testHarness.setProcessingTime(secondsToMillis(20));

	// 3. Assert
	 List&lt;StreamRecord&lt;? extends T&gt;&gt; streamRecords = testHarness.extractOutputStreamRecords();
	 // Expectation: streamRecords will only contain listStateInput2, since listStateInput1 was evicted.
	 // Actual: Getting both listStateInput1 &amp; listStateInput2 in the output.
}

I noticed that TTL is not getting applied by setting processing time. When I tried the same function with Thread.sleep(TTL), the result was as expected.

  1. Is listState TTL using system time for eviction (with testHarness)?

  2. Is there any way to test listStateTTL using testHarness?

答案1

得分: 0

TTL测试应按以下方式进行

    @Test
    public void testSetTtlTimeProvider() throws Exception {
        AbstractStreamOperator<Integer> operator = new AbstractStreamOperator<Integer>() {};
        try (AbstractStreamOperatorTestHarness<Integer> result =
                new AbstractStreamOperatorTestHarness<>(operator, 1, 1, 0)) {

            result.config.setStateKeySerializer(IntSerializer.INSTANCE);
            result.config.serializeAllConfigs();

            Time timeToLive = Time.hours(1);
            result.initializeState(OperatorSubtaskState.builder().build());
            result.open();

            ValueStateDescriptor<Integer> stateDescriptor =
                    new ValueStateDescriptor<>("test", IntSerializer.INSTANCE);
            stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(timeToLive).build());
            KeyedStateBackend<Integer> keyedStateBackend = operator.getKeyedStateBackend();
            ValueState<Integer> state =
                    keyedStateBackend.getPartitionedState(
                            VoidNamespace.INSTANCE,
                            VoidNamespaceSerializer.INSTANCE,
                            stateDescriptor);

            int expectedValue = 42;
            keyedStateBackend.setCurrentKey(1);
            result.setStateTtlProcessingTime(0L);
            state.update(expectedValue);
            Assert.assertEquals(expectedValue, (int) state.value());
            result.setStateTtlProcessingTime(timeToLive.toMilliseconds() + 1);
            Assert.assertNull(state.value());
        }
    }
英文:

TTL test should by the following way

 @Test
public void testSetTtlTimeProvider() throws Exception {
    AbstractStreamOperator&lt;Integer&gt; operator = new AbstractStreamOperator&lt;Integer&gt;() {};
    try (AbstractStreamOperatorTestHarness&lt;Integer&gt; result =
            new AbstractStreamOperatorTestHarness&lt;&gt;(operator, 1, 1, 0)) {

        result.config.setStateKeySerializer(IntSerializer.INSTANCE);
        result.config.serializeAllConfigs();

        Time timeToLive = Time.hours(1);
        result.initializeState(OperatorSubtaskState.builder().build());
        result.open();

        ValueStateDescriptor&lt;Integer&gt; stateDescriptor =
                new ValueStateDescriptor&lt;&gt;(&quot;test&quot;, IntSerializer.INSTANCE);
        stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(timeToLive).build());
        KeyedStateBackend&lt;Integer&gt; keyedStateBackend = operator.getKeyedStateBackend();
        ValueState&lt;Integer&gt; state =
                keyedStateBackend.getPartitionedState(
                        VoidNamespace.INSTANCE,
                        VoidNamespaceSerializer.INSTANCE,
                        stateDescriptor);

        int expectedValue = 42;
        keyedStateBackend.setCurrentKey(1);
        result.setStateTtlProcessingTime(0L);
        state.update(expectedValue);
        Assert.assertEquals(expectedValue, (int) state.value());
        result.setStateTtlProcessingTime(timeToLive.toMilliseconds() + 1);
        Assert.assertNull(state.value());
    }
}

huangapple
  • 本文由 发表于 2020年4月5日 17:37:33
  • 转载请务必保留本文链接:https://java.coder-hub.com/61040623.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定