无法在使用TopologyTestDriver时强制执行窗口抑制。

huangapple 未分类评论50阅读模式
英文:

Unable to force window suppression when using TopologyTestDriver

问题

Topology中的代码段:

builder.<String, String>stream(someTopic)
            .filter((k, v) -> !k.equals("heartbeat"))
            .filter((k, v) -> v != null)
            .filter(this::isRedactedInstance)
            .peek(this::updateRedactedCache)
            .selectKey(this::getKeyFromRedacted)
            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
            .windowedBy(TimeWindows.of(Duration.ofSeconds(10)).advanceBy(Duration.ofSeconds(10)).grace(Duration.ofSeconds(3)))
            .aggregate(() -> null, this::aggregateRedactedByKey)
            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
            .toStream()
            .peek(this::deleteme);

用于测试的单元测试代码:

TopologyTestDriver driver = new TopologyTestDriver(redacted.getMainTopology(), streamingProperties);
driver.pipeInput(
    redactedRecordFactory.create(REDACTED_TOPIC, getRedactedKey(inst1), 
    getEntityEventString(inst1, instSeed1), 0L));
driver.pipeInput(
    redactedRecordFactory.create(REDACTED_TOPIC, getRedactedKey(inst2), 
    getEntityEventString(inst1, instSeed1), 1L));
driver.pipeInput(
    redactedRecordFactory.create(REDACTED_TOPIC, getRedactedKey(inst3), 
    getEntityEventString(inst3, instSeed3), Duration.ofDays(5).toMillis()));
  1. 当我以调试模式运行测试,并在"deleteMe"方法处设置断点时,它从未触发。

  2. 当我在"aggregate"方法中添加断点时,按预期触发(三次)。

  3. 如果我逐步慢慢执行测试,我确实会触发"deleteMe"方法中的断点。

我尝试过推进系统时间,但我了解这与窗口抑制无关(而且除此之外也不起作用)。

我不确定还能尝试什么 - 我本以为第三个具有超长时间戳的事件会触发抑制机制。

英文:

Topology in question:

builder.&lt;String, String&gt;stream(someTopic)
            .filter((k, v) -&gt; !k.equals(&quot;heartbeat&quot;))
            .filter((k, v) -&gt; v != null)
            .filter(this::isRedactedInstance)
            // update our current cache of &quot;known&quot; redacted records
            .peek(this::updateRedactedCache)
            // change key so we can properly do the join further down
            .selectKey(this::getKeyFromRedacted)
            // now we slice the stream into 10 sec long slices and aggregate by key
            // Our redacted comes in every 10 seconds, so we should never get more than one set of redacted per cluster
            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
            .windowedBy(TimeWindows.of(Duration.ofSeconds(10)).advanceBy(Duration.ofSeconds(10)).grace(Duration.ofSeconds(3)))
            .aggregate(() -&gt; null, this::aggregateRedactedByKey)
            // the suppress allows us to ignore intermediate records and only get the final 10 second worth aggregation
            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
            .toStream()
            .peek(this::deleteme)

The unit test being used to test is:

TopologyTestDriver driver = new TopologyTestDriver(redacted.getMainTopology(), streamingProperties);
driver.pipeInput(
    redactedRecordFactory.create(REDACTED_TOPIC,getRedactedKey(inst1), 
    getEntityEventString(inst1, instSeed1), 0L));
driver.pipeInput(
    redactedRecordFactory.create(REDACTED_TOPIC,getRedactedKey(inst2), 
    getEntityEventString(inst1, instSeed1), 1L));
driver.pipeInput(
    redactedRecordFactory.create(REDACTED_TOPIC,getRedactedKey(inst3), 
    getEntityEventString(inst3, instSeed3), Duration.ofDays(5).toMillis()));
  1. When I run the test in debug mode, with a breakpoint at the "deleteMe" method, it is never hit.

  2. When I add breakpoints in the "aggregate" method, they are hit as expected (thrice)

  3. If I step through the test slowly enough, I do hit the breakpoint in the "deleteMe" method.

I have tried advancing the wallclock, but I understand that's not relevant to window suppression (and it didn't work besides).

I'm not sure what else to try - I would have expected the third event, with a super long timestamp, to have triggered the suppression.

huangapple
  • 本文由 发表于 2020年4月7日 02:55:01
  • 转载请务必保留本文链接:https://java.coder-hub.com/61066969.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定