英文:
RxJava: merging observables on different threads
问题
我需要合并在不同线程上运行的两个Observables。示例代码如下:
val obs1 = Observable
.create<Int> { emitter ->
emitter.onNext(1)
emitter.onComplete()
}
.subscribeOn(Schedulers.newThread())
val obs2 = Observable.just(2)
val obs = Observable
.just(1, 2)
.flatMap {
Observable.merge(
Observable.just(it).filter { it == 1 }.flatMap { obs1 },
Observable.just(it).filter { it == 2 }.flatMap { obs2 }
)
}
.serialize()
obs.blockingSubscribe(::println)
期望的输出是 `1, 2`,有时会得到 `2, 1`。
似乎 `serialize()` 无法解决问题。
如何实现正确的发射顺序 - `1, 2`?
英文:
I need to merge two observables running on different threads. Sample code is
val obs1 = Observable
.create<Int> { emitter ->
emitter.onNext(1)
emitter.onComplete()
}
.subscribeOn(Schedulers.newThread())
val obs2 = Observable.just(2)
val obs = Observable
.just(1, 2)
.flatMap {
Observable.merge(
Observable.just(it).filter { it == 1 }.flatMap { obs1 },
Observable.just(it).filter { it == 2 }.flatMap { obs2 }
)
}
.serialize()
obs.blockingSubscribe(::println)
Expecting output as 1, 2
, sometime get it 2, 1
.
It seems serialize()
doesn't help.
How can I achieve right emission order - 1, 2
?
答案1
得分: 0
已迅速找到合适的解决方案
val obs = Observable
.just(1, 2)
.concatMap {
if (it == 1) obs1 else obs2
}
英文:
Have found proper solution quickly
val obs = Observable
.just(1, 2)
.concatMap {
if (it == 1) obs1 else obs2
}
专注分享java语言的经验与见解,让所有开发者获益!
评论