限制 Reactor 中的订阅并发

huangapple 未分类评论45阅读模式
英文:

Limit subscription concurrency in Reactor

问题

我有一个实用的类,用于集中Reactor的Publisher,以处理Cassandra查询:

public Mono<ResultSet> execute(Statement statement) {
    return Mono.defer(() -> Mono.fromFuture(FutureConverter
            .toCompletableFuture(session.executeAsync(statement)))
            .publishOn(Schedulers.elastic()));
}

它的功能良好,但无法处理背压,这最终导致了Cassandra池的耗尽,从而引发错误。
有没有办法限制订阅的并发性(将它们放入FIFO队列),使其达到给定的数量(与池的大小匹配)?

提前感谢您的回答。

英文:

I have an utility class centralizing Reactor's Publishers to deal with Cassandra queries:

public Mono&lt;ResultSet&gt; execute(Statement statement) {
    return Mono.defer(() -&gt; Mono.fromFuture(FutureConverter
            .toCompletableFuture(session.executeAsync(statement)))
            .publishOn(Schedulers.elastic()));
}

It works well expect it does not deal with back-pressure, which ends up with Cassandra pool exhaustion, consequently an error.
Is there a way to to limit the concurrency of the subscriptions (putting them in a FIFO queue) to a given number (which will be match to the pool's size)?

Thanks in advance.

huangapple
  • 本文由 发表于 2020年4月7日 21:37:32
  • 转载请务必保留本文链接:https://java.coder-hub.com/61081324.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定