英文:
testHarness ListState TTL not getting applied on Flink 1.8.2
问题
以下是您要翻译的内容:
public class CustomWindowFunction extends ProcessWindowFunction<InputPOJO, OutputPOJO, String, TimeWindow> {
...
@Override
public void open(Configuration config) {
StateTtlConfig ttlConfig =
StateTtlConfig.newBuilder(listStateTTl)
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
listStateDescriptor = new ListStateDescriptor<>("unprocessedItems", InputPOJO.class);
listStateDescriptor.enableTimeToLive(ttlConfig);
}
@Override
public void process( String key, Context context, Iterable<InputPOJO> windowElements, Collector<OutputPOJO> out) throws Exception {
ListState<InputPOJO> listState = getRuntimeContext().getListState(listStateDescriptor);
....
Iterator<InputPOJO> iterator;
iterator = listState.get().iterator();
while (iterator.hasNext()) {
InputPOJO listStateInput = iterator.next();
System.out.println("There are unexpired elements in listState");
/** Business Logic to compute result using the unexpired values in listState**/
}
/** Business Logic to compute result using the current window elements.*/
iterator = windowElements.iterator();
while (iterator.hasNext()) {
System.out.println("unProcessed Item added to ListState.")
InputPOJO unprocessedItem = iterator.next();
listState.add(unprocessedItem);
}
}
....
}
private OneInputStreamOperatorTestHarness<InputPOJO, OutputPOJO> testHarness;
private CustomWindowFunction customWindowFunction;
@Before
public void setup_testHarness() throws Exception {
...
customWindowFunction = new CustomWindowFunction(secondsToMillis(windowListStateTtlMillis));
WindowOperator<String, InputPOJO, Iterable<InputPOJO>, OutputPOJO, TimeWindow>
operator =
new WindowOperator<>(
ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)),
new TimeWindow.Serializer(),
keySelector,
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableProcessWindowFunction<>(customWindowFunction),
new CustomTrigger(secondsToMillis(allowedLatenessSeconds)),
0,
null);
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setup();
testHarness.open();
}
@Test
public void test_listStateTtl_exclusion() throws Exception {
int allowedLatenessSeconds = 3;
int listStateTTL = 10;
...
testHarness.setProcessingTime(secondsToMillis(1));
testHarness.processElement(new StreamRecord<>(listStateInput1));
...
testHarness.setProcessingTime(secondsToMillis(4));
...
testHarness.setProcessingTime(secondsToMillis(15));
...
testHarness.setProcessingTime(secondsToMillis(16));
testHarness.processElement(new StreamRecord<>(listStateInput2));
...
testHarness.setProcessingTime(secondsToMillis(20));
...
List<StreamRecord<? extends T>> streamRecords = testHarness.extractOutputStreamRecords();
}
我注意到设置处理时间时未应用TTL。当我尝试使用Thread.sleep(TTL)执行相同的函数时,结果是符合预期的。
-
列表状态(listState)的TTL是否使用系统时间进行清除(使用testHarness)?
-
是否有办法使用testHarness测试listState的TTL?
英文:
I am testing a window function which has a listState, with TTL enabled.
Snippet of window function:
public class CustomWindowFunction extends ProcessWindowFunction<InputPOJO, OutputPOJO, String, TimeWindow> {
...
@Override
public void open(Configuration config) {
StateTtlConfig ttlConfig =
StateTtlConfig.newBuilder(listStateTTl)
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // NOTE: NeverReturnExpired
.build();
listStateDescriptor = new ListStateDescriptor<>("unprocessedItems", InputPOJO.class);
listStateDescriptor.enableTimeToLive(ttlConfig);
}
@Override
public void process( String key, Context context, Iterable<InputPOJO> windowElements, Collector<OutputPOJO> out) throws Exception {
ListState<InputPOJO> listState = getRuntimeContext().getListState(listStateDescriptor);
....
Iterator<InputPOJO> iterator;
// Getting unexpired listStateItems for computation.
iterator = listState.get().iterator();
while (iterator.hasNext()) {
InputPOJO listStateInput = iterator.next();
System.out.println("There are unexpired elements in listState");
/** Business Logic to compute result using the unexpired values in listState**/
}
/** Business Logic to compute result using the current window elements.*/
// Adding unProcessed WindowElements to ListState(with TTL)
// NOTE: processed WindowElements are removed manually.
iterator = windowElements.iterator();
while (iterator.hasNext()) {
System.out.println("unProcessed Item added to ListState.")
InputPOJO unprocessedItem = iterator.next();
listState.add(unprocessedItem); // This part gets executed for listStateInput1
}
}
....
}
I am using testHarness
to perform the integration test. I am testing the listState item count when the TTL for the listState is expired. Below is my test function snippet.
NOTE:
- There is a custom allowedLateness which is implemented using a custom Timer.
private OneInputStreamOperatorTestHarness<InputPOJO, OutputPOJO> testHarness;
private CustomWindowFunction customWindowFunction;
@Before
public void setup_testHarness() throws Exception {
KeySelector<InputPOJO, String> keySelector = InputPOJO::getKey;
TypeInformation<InputPOJO> STRING_INT_TUPLE = TypeInformation.of(new TypeHint<InputPOJO>() {}); // Any suggestion ?
ListStateDescriptor<InputPOJO> stateDesc = new ListStateDescriptor<>("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())); // Any suggestion ?
/**
* Creating windowOperator for the below function
*
* <pre>
*
* DataStream<OutputPOJO> OutputPOJOStream =
* inputPOJOStream
* .keyBy(InputPOJO::getKey)
* .window(ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)))
* .trigger(new CustomTrigger(triggerAllowedLatenessMillis))
* .process(new CustomWindowFunction(windowListStateTtlMillis));
* </pre>
*/
customWindowFunction = new CustomWindowFunction(secondsToMillis(windowListStateTtlMillis));
WindowOperator<String, InputPOJO, Iterable<InputPOJO>, OutputPOJO, TimeWindow>
operator =
new WindowOperator<>(
// setting .window(ProcessingTimeSessionWindows.withGap(maxTimeout))
ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)),
new TimeWindow.Serializer(),
// setting .keyBy(InputPOJO::getKey)
keySelector,
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
// setting .process(new CustomWindowFunction(windowListStateTtlMillis))
new InternalIterableProcessWindowFunction<>(customWindowFunction),
// setting .trigger(new CustomTrigger(allowedLateness))
new CustomTrigger(secondsToMillis(allowedLatenessSeconds)),
0,
null);
// Creating testHarness for window operator
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO);
// Setup and Open Test Harness
testHarness.setup();
testHarness.open();
}
@Test
public void test_listStateTtl_exclusion() throws Exception {
int allowedLatenessSeconds = 3;
int listStateTTL = 10;
//1. Arrange
InputPOJO listStateInput1 = new InputPOJO(1,"Arjun");
InputPOJO listStateInput2 = new InputPOJO(2,"Arun");
// 2. Act
// listStateInput1 comes at 1 sec
testHarness.setProcessingTime(secondsToMillis(1));
testHarness.processElement(new StreamRecord<>(listStateInput1));
// Setting current processing time to 1 + 3 = 4 > allowedLateness.
// Window.process() is called, and window is purged (FIRE_AND_PURGE)
// Expectation: listStateInput1 is put into listState with TTL (10 secs), before process() ends.
testHarness.setProcessingTime(secondsToMillis(4));
// Setting processing time after listStateTTL, ie 4 + listStateTTL(10) + 1 = 15
// Expectation: listStateInput1 is evicted from the listState (Fails)
testHarness.setProcessingTime(secondsToMillis(15));
// Using sleep(), the listStateTTL is getting applied to listState and listStateInput1 is evicted (Pass)
//Thread.sleep(secondsToMillis(15))
//Passing listStateInput2 to the test Harness
testHarness.setProcessingTime(secondsToMillis(16));
testHarness.processElement(new StreamRecord<>(listStateInput2));
// Setting processing time after allowedLateness = 16 + 3 + 1 = 20
testHarness.setProcessingTime(secondsToMillis(20));
// 3. Assert
List<StreamRecord<? extends T>> streamRecords = testHarness.extractOutputStreamRecords();
// Expectation: streamRecords will only contain listStateInput2, since listStateInput1 was evicted.
// Actual: Getting both listStateInput1 & listStateInput2 in the output.
}
I noticed that TTL is not getting applied by setting processing time. When I tried the same function with Thread.sleep(TTL), the result was as expected.
-
Is listState TTL using system time for eviction (with testHarness)?
-
Is there any way to test listStateTTL using testHarness?
答案1
得分: 0
TTL测试应按以下方式进行:
@Test
public void testSetTtlTimeProvider() throws Exception {
AbstractStreamOperator<Integer> operator = new AbstractStreamOperator<Integer>() {};
try (AbstractStreamOperatorTestHarness<Integer> result =
new AbstractStreamOperatorTestHarness<>(operator, 1, 1, 0)) {
result.config.setStateKeySerializer(IntSerializer.INSTANCE);
result.config.serializeAllConfigs();
Time timeToLive = Time.hours(1);
result.initializeState(OperatorSubtaskState.builder().build());
result.open();
ValueStateDescriptor<Integer> stateDescriptor =
new ValueStateDescriptor<>("test", IntSerializer.INSTANCE);
stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(timeToLive).build());
KeyedStateBackend<Integer> keyedStateBackend = operator.getKeyedStateBackend();
ValueState<Integer> state =
keyedStateBackend.getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
stateDescriptor);
int expectedValue = 42;
keyedStateBackend.setCurrentKey(1);
result.setStateTtlProcessingTime(0L);
state.update(expectedValue);
Assert.assertEquals(expectedValue, (int) state.value());
result.setStateTtlProcessingTime(timeToLive.toMilliseconds() + 1);
Assert.assertNull(state.value());
}
}
英文:
TTL test should by the following way
@Test
public void testSetTtlTimeProvider() throws Exception {
AbstractStreamOperator<Integer> operator = new AbstractStreamOperator<Integer>() {};
try (AbstractStreamOperatorTestHarness<Integer> result =
new AbstractStreamOperatorTestHarness<>(operator, 1, 1, 0)) {
result.config.setStateKeySerializer(IntSerializer.INSTANCE);
result.config.serializeAllConfigs();
Time timeToLive = Time.hours(1);
result.initializeState(OperatorSubtaskState.builder().build());
result.open();
ValueStateDescriptor<Integer> stateDescriptor =
new ValueStateDescriptor<>("test", IntSerializer.INSTANCE);
stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(timeToLive).build());
KeyedStateBackend<Integer> keyedStateBackend = operator.getKeyedStateBackend();
ValueState<Integer> state =
keyedStateBackend.getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
stateDescriptor);
int expectedValue = 42;
keyedStateBackend.setCurrentKey(1);
result.setStateTtlProcessingTime(0L);
state.update(expectedValue);
Assert.assertEquals(expectedValue, (int) state.value());
result.setStateTtlProcessingTime(timeToLive.toMilliseconds() + 1);
Assert.assertNull(state.value());
}
}
专注分享java语言的经验与见解,让所有开发者获益!
评论