英文:
Unable to force window suppression when using TopologyTestDriver
问题
Topology中的代码段:
builder.<String, String>stream(someTopic)
            .filter((k, v) -> !k.equals("heartbeat"))
            .filter((k, v) -> v != null)
            .filter(this::isRedactedInstance)
            .peek(this::updateRedactedCache)
            .selectKey(this::getKeyFromRedacted)
            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
            .windowedBy(TimeWindows.of(Duration.ofSeconds(10)).advanceBy(Duration.ofSeconds(10)).grace(Duration.ofSeconds(3)))
            .aggregate(() -> null, this::aggregateRedactedByKey)
            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
            .toStream()
            .peek(this::deleteme);
用于测试的单元测试代码:
TopologyTestDriver driver = new TopologyTestDriver(redacted.getMainTopology(), streamingProperties);
driver.pipeInput(
    redactedRecordFactory.create(REDACTED_TOPIC, getRedactedKey(inst1), 
    getEntityEventString(inst1, instSeed1), 0L));
driver.pipeInput(
    redactedRecordFactory.create(REDACTED_TOPIC, getRedactedKey(inst2), 
    getEntityEventString(inst1, instSeed1), 1L));
driver.pipeInput(
    redactedRecordFactory.create(REDACTED_TOPIC, getRedactedKey(inst3), 
    getEntityEventString(inst3, instSeed3), Duration.ofDays(5).toMillis()));
- 
当我以调试模式运行测试,并在"deleteMe"方法处设置断点时,它从未触发。
 - 
当我在"aggregate"方法中添加断点时,按预期触发(三次)。
 - 
如果我逐步慢慢执行测试,我确实会触发"deleteMe"方法中的断点。
 
我尝试过推进系统时间,但我了解这与窗口抑制无关(而且除此之外也不起作用)。
我不确定还能尝试什么 - 我本以为第三个具有超长时间戳的事件会触发抑制机制。
英文:
Topology in question:
builder.<String, String>stream(someTopic)
            .filter((k, v) -> !k.equals("heartbeat"))
            .filter((k, v) -> v != null)
            .filter(this::isRedactedInstance)
            // update our current cache of "known" redacted records
            .peek(this::updateRedactedCache)
            // change key so we can properly do the join further down
            .selectKey(this::getKeyFromRedacted)
            // now we slice the stream into 10 sec long slices and aggregate by key
            // Our redacted comes in every 10 seconds, so we should never get more than one set of redacted per cluster
            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
            .windowedBy(TimeWindows.of(Duration.ofSeconds(10)).advanceBy(Duration.ofSeconds(10)).grace(Duration.ofSeconds(3)))
            .aggregate(() -> null, this::aggregateRedactedByKey)
            // the suppress allows us to ignore intermediate records and only get the final 10 second worth aggregation
            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
            .toStream()
            .peek(this::deleteme)
The unit test being used to test is:
TopologyTestDriver driver = new TopologyTestDriver(redacted.getMainTopology(), streamingProperties);
driver.pipeInput(
    redactedRecordFactory.create(REDACTED_TOPIC,getRedactedKey(inst1), 
    getEntityEventString(inst1, instSeed1), 0L));
driver.pipeInput(
    redactedRecordFactory.create(REDACTED_TOPIC,getRedactedKey(inst2), 
    getEntityEventString(inst1, instSeed1), 1L));
driver.pipeInput(
    redactedRecordFactory.create(REDACTED_TOPIC,getRedactedKey(inst3), 
    getEntityEventString(inst3, instSeed3), Duration.ofDays(5).toMillis()));
- 
When I run the test in debug mode, with a breakpoint at the "deleteMe" method, it is never hit.
 - 
When I add breakpoints in the "aggregate" method, they are hit as expected (thrice)
 - 
If I step through the test slowly enough, I do hit the breakpoint in the "deleteMe" method.
 
I have tried advancing the wallclock, but I understand that's not relevant to window suppression (and it didn't work besides).
I'm not sure what else to try - I would have expected the third event, with a super long timestamp, to have triggered the suppression.
专注分享java语言的经验与见解,让所有开发者获益!

评论