英文:
Unable to force window suppression when using TopologyTestDriver
问题
Topology中的代码段:
builder.<String, String>stream(someTopic)
.filter((k, v) -> !k.equals("heartbeat"))
.filter((k, v) -> v != null)
.filter(this::isRedactedInstance)
.peek(this::updateRedactedCache)
.selectKey(this::getKeyFromRedacted)
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)).advanceBy(Duration.ofSeconds(10)).grace(Duration.ofSeconds(3)))
.aggregate(() -> null, this::aggregateRedactedByKey)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.peek(this::deleteme);
用于测试的单元测试代码:
TopologyTestDriver driver = new TopologyTestDriver(redacted.getMainTopology(), streamingProperties);
driver.pipeInput(
redactedRecordFactory.create(REDACTED_TOPIC, getRedactedKey(inst1),
getEntityEventString(inst1, instSeed1), 0L));
driver.pipeInput(
redactedRecordFactory.create(REDACTED_TOPIC, getRedactedKey(inst2),
getEntityEventString(inst1, instSeed1), 1L));
driver.pipeInput(
redactedRecordFactory.create(REDACTED_TOPIC, getRedactedKey(inst3),
getEntityEventString(inst3, instSeed3), Duration.ofDays(5).toMillis()));
-
当我以调试模式运行测试,并在"deleteMe"方法处设置断点时,它从未触发。
-
当我在"aggregate"方法中添加断点时,按预期触发(三次)。
-
如果我逐步慢慢执行测试,我确实会触发"deleteMe"方法中的断点。
我尝试过推进系统时间,但我了解这与窗口抑制无关(而且除此之外也不起作用)。
我不确定还能尝试什么 - 我本以为第三个具有超长时间戳的事件会触发抑制机制。
英文:
Topology in question:
builder.<String, String>stream(someTopic)
.filter((k, v) -> !k.equals("heartbeat"))
.filter((k, v) -> v != null)
.filter(this::isRedactedInstance)
// update our current cache of "known" redacted records
.peek(this::updateRedactedCache)
// change key so we can properly do the join further down
.selectKey(this::getKeyFromRedacted)
// now we slice the stream into 10 sec long slices and aggregate by key
// Our redacted comes in every 10 seconds, so we should never get more than one set of redacted per cluster
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)).advanceBy(Duration.ofSeconds(10)).grace(Duration.ofSeconds(3)))
.aggregate(() -> null, this::aggregateRedactedByKey)
// the suppress allows us to ignore intermediate records and only get the final 10 second worth aggregation
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.peek(this::deleteme)
The unit test being used to test is:
TopologyTestDriver driver = new TopologyTestDriver(redacted.getMainTopology(), streamingProperties);
driver.pipeInput(
redactedRecordFactory.create(REDACTED_TOPIC,getRedactedKey(inst1),
getEntityEventString(inst1, instSeed1), 0L));
driver.pipeInput(
redactedRecordFactory.create(REDACTED_TOPIC,getRedactedKey(inst2),
getEntityEventString(inst1, instSeed1), 1L));
driver.pipeInput(
redactedRecordFactory.create(REDACTED_TOPIC,getRedactedKey(inst3),
getEntityEventString(inst3, instSeed3), Duration.ofDays(5).toMillis()));
-
When I run the test in debug mode, with a breakpoint at the "deleteMe" method, it is never hit.
-
When I add breakpoints in the "aggregate" method, they are hit as expected (thrice)
-
If I step through the test slowly enough, I do hit the breakpoint in the "deleteMe" method.
I have tried advancing the wallclock, but I understand that's not relevant to window suppression (and it didn't work besides).
I'm not sure what else to try - I would have expected the third event, with a super long timestamp, to have triggered the suppression.
专注分享java语言的经验与见解,让所有开发者获益!
评论