RxJava:在不同线程上合并 observables。

huangapple 未分类评论59阅读模式
英文:

RxJava: merging observables on different threads

问题

我需要合并在不同线程上运行的两个Observables。示例代码如下:

val obs1 = Observable
.create<Int> { emitter ->
emitter.onNext(1)
emitter.onComplete()
}
.subscribeOn(Schedulers.newThread())

val obs2 = Observable.just(2)

val obs = Observable
.just(1, 2)
.flatMap {
Observable.merge(
Observable.just(it).filter { it == 1 }.flatMap { obs1 },
Observable.just(it).filter { it == 2 }.flatMap { obs2 }
)
}
.serialize()

obs.blockingSubscribe(::println)


期望的输出是 `1, 2`,有时会得到 `2, 1`。
似乎 `serialize()` 无法解决问题。
如何实现正确的发射顺序 - `1, 2`?
英文:

I need to merge two observables running on different threads. Sample code is

val obs1 = Observable
	.create&lt;Int&gt; { emitter -&gt;
		emitter.onNext(1)
		emitter.onComplete()
	}
	.subscribeOn(Schedulers.newThread())

val obs2 = Observable.just(2)

val obs = Observable
	.just(1, 2)
	.flatMap {
		Observable.merge(
			Observable.just(it).filter { it == 1 }.flatMap { obs1 },
			Observable.just(it).filter { it == 2 }.flatMap { obs2 }
		)
	}
	.serialize()

obs.blockingSubscribe(::println)

Expecting output as 1, 2, sometime get it 2, 1.
It seems serialize() doesn't help.
How can I achieve right emission order - 1, 2?

答案1

得分: 0

已迅速找到合适的解决方案 RxJava:在不同线程上合并 observables。

val obs = Observable
    .just(1, 2)
	.concatMap {
        if (it == 1) obs1 else obs2
	}
英文:

Have found proper solution quickly RxJava:在不同线程上合并 observables。

val obs = Observable
    .just(1, 2)
	.concatMap {
        if (it == 1) obs1 else obs2
	}

huangapple
  • 本文由 发表于 2020年5月3日 17:28:08
  • 转载请务必保留本文链接:https://java.coder-hub.com/61572251.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定