Apache Beam/Java,如何设置仅在每个窗口中仅发送一次数据的窗口/触发器。

huangapple 未分类评论55阅读模式
英文:

Apache Beam/Java, how to set window/trigger that sends the data only once for each window

问题

  1. 我有一个如下的流水线
  2. Window<String> fixedWindow = Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration())))
  3. .triggering(
  4. AfterWatermark.pastEndOfWindow()
  5. .withEarlyFirings(AfterProcessingTime
  6. .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(options.getWindowDuration()))))
  7. .withAllowedLateness(Duration.ZERO)
  8. .discardingFiredPanes();
  9. PCollectionTuple productProcessorPT = pipeline
  10. .apply(READ_PRODUCT_FROM_PUBSUB.getName(), PubsubIO.readStrings()
  11. .fromSubscription(options.getProductSubscription()))
  12. .apply(PRODUCT_WINDOW.getName(), fixedWindow)
  13. .apply(PROCESS_PRODUCT.getName(), ParDo.of(new ProductProcessor()))
  14. .apply(GROUP_PRODUCT_DATA.getName(), GroupByKey.create())
  15. .apply(COMBINE_PRODUCT_DATA.getName(), ParDo.of(new ProductCombiner())
  16. .withOutputTags(KV_STRING_OBJECTNODE, TupleTagList.of(PIPELINE_ERROR)));
  17. 我想实现的是设置一个窗口/触发器每隔60秒收集数据然后将数据发送到下一个转换我不关心事件时间戳
  18. 上面的代码每隔60秒将数据发送到下一个转换但即使流水线中没有新数据进入它仍然保持触发/发送相同的数据不确定为什么会发生这种情况
英文:

I have a pipeline as below:

  1. Window&lt;String&gt; fixedWindow = Window.&lt;String&gt;into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration())))
  2. .triggering(
  3. AfterWatermark.pastEndOfWindow()
  4. .withEarlyFirings(AfterProcessingTime
  5. .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(options.getWindowDuration()))))
  6. .withAllowedLateness(Duration.ZERO)
  7. .discardingFiredPanes();
  8. PCollectionTuple productProcessorPT = pipeline
  9. .apply(READ_PRODUCT_FROM_PUBSUB.getName(), PubsubIO.readStrings()
  10. .fromSubscription(options.getProductSubscription()))
  11. .apply(PRODUCT_WINDOW.getName(), fixedWindow)
  12. .apply(PROCESS_PRODUCT.getName(), ParDo.of(new ProductProcessor()))
  13. .apply(GROUP_PRODUCT_DATA.getName(), GroupByKey.create())
  14. .apply(COMBINE_PRODUCT_DATA.getName(), ParDo.of(new ProductCombiner())
  15. .withOutputTags(KV_STRING_OBJECTNODE, TupleTagList.of(PIPELINE_ERROR)));

What I want to achieve is to set a window/trigger that gather the data every 60s, and then send the data to next transform. How can I do that? I don't care the event timestamp.

The code above send data to next transform every 60s, but it also keeps triggering/sending (the same) data even there is no new data come in to the pipeline. No sure why that happens?

答案1

得分: 0

你可以移除触发器,只需像下面这样使用FixedWindows,以便每隔60秒发出记录:

  1. Window<String> fixedWindow = Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration())));

这将使用默认的触发和处理迟到事件的方式,基本上意味着数据会在窗口结束时被发出,所有迟到的事件都会被忽略。

英文:

You can remove the triggering and just use FixedWindows as below to emit records every 60 seconds

  1. Window&lt;String&gt; fixedWindow = Window.&lt;String&gt;into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration())));

This will use default triggering and handling of late events which would basically mean that the data is emitted at the end of the window and all late events are ignored.

huangapple
  • 本文由 发表于 2020年4月5日 09:37:59
  • 转载请务必保留本文链接:https://java.coder-hub.com/61037001.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定