英文:
Apache Beam/Java, how to set window/trigger that sends the data only once for each window
问题
我有一个如下的流水线:
Window<String> fixedWindow = Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration())))
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime
.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(options.getWindowDuration()))))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes();
PCollectionTuple productProcessorPT = pipeline
.apply(READ_PRODUCT_FROM_PUBSUB.getName(), PubsubIO.readStrings()
.fromSubscription(options.getProductSubscription()))
.apply(PRODUCT_WINDOW.getName(), fixedWindow)
.apply(PROCESS_PRODUCT.getName(), ParDo.of(new ProductProcessor()))
.apply(GROUP_PRODUCT_DATA.getName(), GroupByKey.create())
.apply(COMBINE_PRODUCT_DATA.getName(), ParDo.of(new ProductCombiner())
.withOutputTags(KV_STRING_OBJECTNODE, TupleTagList.of(PIPELINE_ERROR)));
我想实现的是设置一个窗口/触发器,每隔60秒收集数据,然后将数据发送到下一个转换。我不关心事件时间戳。
上面的代码每隔60秒将数据发送到下一个转换,但即使流水线中没有新数据进入,它仍然保持触发/发送(相同的)数据。不确定为什么会发生这种情况?
英文:
I have a pipeline as below:
Window<String> fixedWindow = Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration())))
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime
.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(options.getWindowDuration()))))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes();
PCollectionTuple productProcessorPT = pipeline
.apply(READ_PRODUCT_FROM_PUBSUB.getName(), PubsubIO.readStrings()
.fromSubscription(options.getProductSubscription()))
.apply(PRODUCT_WINDOW.getName(), fixedWindow)
.apply(PROCESS_PRODUCT.getName(), ParDo.of(new ProductProcessor()))
.apply(GROUP_PRODUCT_DATA.getName(), GroupByKey.create())
.apply(COMBINE_PRODUCT_DATA.getName(), ParDo.of(new ProductCombiner())
.withOutputTags(KV_STRING_OBJECTNODE, TupleTagList.of(PIPELINE_ERROR)));
What I want to achieve is to set a window/trigger that gather the data every 60s, and then send the data to next transform. How can I do that? I don't care the event timestamp.
The code above send data to next transform every 60s, but it also keeps triggering/sending (the same) data even there is no new data come in to the pipeline. No sure why that happens?
答案1
得分: 0
你可以移除触发器,只需像下面这样使用FixedWindows
,以便每隔60秒发出记录:
Window<String> fixedWindow = Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration())));
这将使用默认的触发和处理迟到事件的方式,基本上意味着数据会在窗口结束时被发出,所有迟到的事件都会被忽略。
英文:
You can remove the triggering and just use FixedWindows
as below to emit records every 60 seconds
Window<String> fixedWindow = Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration())));
This will use default triggering and handling of late events which would basically mean that the data is emitted at the end of the window and all late events are ignored.
专注分享java语言的经验与见解,让所有开发者获益!
评论