英文:
Limit subscription concurrency in Reactor
问题
我有一个实用的类,用于集中Reactor的Publisher
,以处理Cassandra查询:
public Mono<ResultSet> execute(Statement statement) {
return Mono.defer(() -> Mono.fromFuture(FutureConverter
.toCompletableFuture(session.executeAsync(statement)))
.publishOn(Schedulers.elastic()));
}
它的功能良好,但无法处理背压,这最终导致了Cassandra池的耗尽,从而引发错误。
有没有办法限制订阅的并发性(将它们放入FIFO队列),使其达到给定的数量(与池的大小匹配)?
提前感谢您的回答。
英文:
I have an utility class centralizing Reactor's Publisher
s to deal with Cassandra queries:
public Mono<ResultSet> execute(Statement statement) {
return Mono.defer(() -> Mono.fromFuture(FutureConverter
.toCompletableFuture(session.executeAsync(statement)))
.publishOn(Schedulers.elastic()));
}
It works well expect it does not deal with back-pressure, which ends up with Cassandra pool exhaustion, consequently an error.
Is there a way to to limit the concurrency of the subscriptions (putting them in a FIFO queue) to a given number (which will be match to the pool's size)?
Thanks in advance.
专注分享java语言的经验与见解,让所有开发者获益!
评论