如何创建一个包含历史和未来项目的Flux?

huangapple 未分类评论70阅读模式
标题翻译

How to create a Flux including historical and future items?

问题

我有一个应用程序,它接收事件(使用Spring应用程序监听器),我想创建一个Flux来表示一些历史事件(可能从持久性存储中读取),以及一个无限的流式传入事件。

目标是提供一个公共方法,返回所有已经被观察到的事件的Flux,即初始历史事件的串联,所有已观察到的事件以及所有未来事件。以下服务应该提供以stream()方法形式的API。

  1. @Service
  2. public class EventService implements ApplicationListener<Event> {
  3. private Flux<Integer> eventStream = getHistoricalEvents();
  4. private Flux<Integer> getHistoricalEvents() {
  5. return Flux.just(1,2,3,4,5);
  6. }
  7. @Override
  8. public void onApplicationEvent(Event event) {
  9. eventStream = eventStream.concatWithValues(event.hashCode());
  10. }
  11. public Flux<Integer> stream(){
  12. return eventStream;
  13. }
  14. }

这样可以实现前两个目标,即客户端接收到初始的一组历史事件以及迄今已观察到的所有事件。

如何添加未来事件?理想情况下,我正在寻找一个能够在语法上实现所有这些的解决方案,因为我对响应式编程还不熟悉。

英文翻译

I have an application that receives events (using Spring Application Listeners) and I want to create a Flux of some historical events (that might be read from persistence) and an infinite stream of incoming events.
The goal is to provide a public method that returns a Flux of all events that have ever been observed, i.e., the concatenation of the initial historical events, all events observed and all future events. The following service is supposed to provide the API in terms of the method stream().

  1. @Service
  2. public class EventService implements ApplicationListener&lt;Event&gt;{
  3. private Flux&lt;Integer&gt; eventStream = getHistoricalEvents();
  4. private Flux&lt;Integer&gt; getHistoricalEvents() {
  5. return Flux.just(1,2,3,4,5);
  6. }
  7. @Override
  8. public void onApplicationEvent(Event event) {
  9. eventStream = eventStream.concatWithValues(event.hashCode());
  10. }
  11. public Flux&lt;Integer&gt; stream(){
  12. return eventStream;
  13. }
  14. }

This works to achieve the first two goals, i.e., clients receive the initial set of historical events and all events that have been observed so far.

How can I add future events? Ideally, I am looking for a solution that achieves all that idiomatically as I am new to reactive programming.

答案1

得分: 0

你已经找到了如何与现有的持久化历史事件进行concat的方法。至于所有未来事件,听起来你很可能需要Flux.cache()

> 将这个Flux转变为热源,并缓存最近发出的信号以供进一步订阅者使用。将保留一个无限量的onNext信号。完成和错误也将被重新播放。

英文翻译

You've already worked out how to concat with existing persisted, historical events. As for all future events, it sounds very much like you need Flux.cache():

> Turn this Flux into a hot source and cache last emitted signals for further Subscriber. Will retain an unbounded volume of onNext signals. Completion and Error will also be replayed.

huangapple
  • 本文由 发表于 2020年3月4日 05:43:03
  • 转载请务必保留本文链接:https://java.coder-hub.com/60515954.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定