英文:
Is there a doAfterSubscribe equivalent in RxJava2?
问题
我有一个可观察对象,用于流式传输所有请求的响应。我想在进行请求时创建该可观察对象的过滤器,以便我可以与多个订阅者共享输出。以下是一些示例代码。
PublishSubject<String> publishSubject = PublishSubject.create();
Observable<String> fooObservable = publishSubject.filter(value -> value.startsWith("foo"))
.doOnSubscribe(disposable -> {
publishSubject.onNext("foobar");
})
.replay(1)
.refCount();
fooObservable.subscribe(val -> log.info("A val : <{}>", val));
我正在使用PublishSubject作为我的模拟服务,因为有时服务会立即返回响应。
我发现的问题是,因为没有当前订阅,当立即有结果返回时,我的fooObservable没有被填充。也就是说,当我想要看到:
A val : <foobar>
时,我没有得到日志输出。
注意,我使用这段代码得到相同的结果:
PublishSubject<String> publishSubject = PublishSubject.create();
Observable<String> fooObservable = publishSubject.filter(value -> value.startsWith("foo"))
.replay(1)
.refCount();
publishSubject.onNext("foobar");
fooObservable.subscribe(val -> log.info("A val : <{}>", val));
因此,问题在于fooObservable直到被订阅之后才订阅到PublishSubject。
是否有一种方法可以在订阅fooObservable后立即运行代码?
编辑:
我考虑过类似以下的代码:
PublishSubject<String> publishSubject = PublishSubject.create();
BehaviorSubject<String> fooObservable = BehaviorSubject.create();
publishSubject.filter(value -> value.startsWith("foo")).subscribe(fooObservable);
publishSubject.onNext("foobar");
fooObservable.subscribe(val -> log.info("A val : <{}>", val));
但是那样我会有两个订阅,并且我不确定如何清理,因为在过滤器后的订阅不会返回一个可处理的对象。
编辑2:关于后台任务的描述。
我有一个第三方服务,我的代码需要订阅该服务。此服务会调用我的代码中的一个onResponse方法,参数是我的原始请求和响应。响应可能会随时通过对onResponse的新调用进行更新。
我想要创建一个包装器,为此服务创建一个方法:
public Observable<Response> getObservable(Request req);
如果请求与已订阅的请求匹配,则在订阅时,可观察对象应立即提供最新的匹配值。
当没有订阅者时,我需要取消订阅我正在包装的服务。
英文:
I have an observable that streams responses to all requests. I'm wanting to create a filter of that observable when a request is made so that I can share the output with multiple subscribers. Following is some example code.
PublishSubject<String> publishSubject = PublishSubject.create();
Observable<String> fooObservable = publishSubject.filter(value -> value.startsWith("foo"))
.doOnSubscribe(disposable -> {
publishSubject.onNext("foobar");
})
.replay(1)
.refCount();
fooObservable.subscribe(val -> log.info("A val : <{}>", val));
I'm using a PublishSubject as my mock service because sometimes the service will return a response immediately.
What I'm finding is that because there is no current subscription when there is an immediate result my fooObservable is not being populated. i.e. I get no log output when I want to see :
A val : <foobar>
Note that I get the same result with this code :
PublishSubject<String> publishSubject = PublishSubject.create();
Observable<String> fooObservable = publishSubject.filter(value -> value.startsWith("foo"))
.replay(1)
.refCount();
publishSubject.onNext("foobar");
fooObservable.subscribe(val -> log.info("A val : <{}>", val));
So the issue is that the fooObservable doesn't subscribe to the PublishSubject until after it has been subscribed to,
Is there a way to run code immediately after the first subscription to the fooObservable?
Edit:
I thought about something like :
PublishSubject<String> publishSubject = PublishSubject.create();
BehaviorSubject<String> fooObservable = BehaviorSubject.create();
publishSubject.filter(value -> value.startsWith("foo")).subscribe(fooObservable);
publishSubject.onNext("foobar");
fooObservable.subscribe(val -> log.info("A val : <{}>", val));
But then I've got 2 subscriptions and I'm not sure how to clean up as the subscribe after the filter does not return a disposable.
Edit 2 : A description of the background task.
I have a third party service that my code needs to subscribe to. This service calls an onResponse method in my code with a parameter containing my original request and the response. The response may be updated by a new call to onResponse at any time.
I want to wrap to create a wrapper for this service that provides a method:
public Observable<Response> getObservable(Request req);
If the request matches one that is already subscribed then the observable should provide the most recent matching value immediately on subscription.
When there are no subscribers I need to unsubscribe from the service I'm wrapping.
答案1
得分: 0
我认为publish()
和connect()
是你所需要的。在这里你可以阅读更多相关信息。
在你的情况下,大致会是这样:
PublishSubject<String> publishSubject = PublishSubject.create();
Observable<String> fooObservable = publishSubject
.filter(value -> value.startsWith("foo"))
.publish();
fooObservable.subscribe(val -> log.info("A val : <{}>", val));
Observable<Object> o2 = fooObservable.map(val -> new Object());
Observable<Object> o3 = fooObservable.map(val -> /* 在这里添加内容 */);
Disposable disposable = fooObservable.connect();
但是记得要执行disposable.dispose()
以避免内存泄漏。
编辑
BehaviorSubject<String> publishSubject = BehaviorSubject.create();
Observable<String> fooObservable = publishSubject.filter(value ->
value.startsWith("foo"));
fooObservable.subscribe(val -> log.info("A val : <{}>", val));
publishSubject.onNext("foobar");
英文:
I think publish()
and connect()
are what you need. Here you can read more about it.
In your case it's gonna be something like:
PublishSubject<String> publishSubject = PublishSubject.create();
Observable<String> fooObservable = publishSubject
.filter(value -> value.startsWith("foo"))
.publish();
fooObservable.subscribe(val -> log.info("A val : <{}>", val));
Observable<Object> o2 = fooObservable.map { new Object() }
Observable<Object> o3 = fooObservable.map { /* Something here*/ }
Disposable disposable = fooObservable.connect()
But remember to do disposable.dispose()
to not have leak
EDIT
BehaviorSubject<String> publishSubject = BehaviorSubject.create();
Observable<String> fooObservable = publishSubject.filter(value ->
value.startsWith("foo"));
fooObservable.subscribe(val -> log.info("A val : <{}>", val));
publishSubject.onNext("foobar");
答案2
得分: 0
我认为问题的最新编辑有助于解释您试图做什么,而这个GitHub链接可能包含您正在寻找的答案:
https://github.com/ReactiveX/RxJava/issues/4675
但我仍然会分享我的测试代码。最终,我遵循了上面链接的建议,使用了 PublishSubject
与 .replay(1).refCount()
。
假设我们有一个第三方服务接口:
interface ThirdPartyService
{
void subscribe(Consumer<Integer> responseConsumer);
void unsubscribe();
}
接下来,让我们创建一个模拟实现,它将在取消订阅之前使用随机整数调用消费者:
// Mock service to emit a random integer once per second, no Rx:
ThirdPartyService mockService = new ThirdPartyService() {
Timer timer = new Timer();
@Override
public void subscribe(Consumer<Integer> responseConsumer)
{
System.out.println("Subscribe");
Random random = new Random();
TimerTask task = new TimerTask() {
@Override
public void run()
{
int i = random.nextInt(10);
System.out.println("Producing: " + i);
responseConsumer.accept(i);
}
};
timer.schedule(task, 1000, 1000);
}
@Override
public void unsubscribe()
{
System.out.println("Unsubscribe");
timer.cancel();
}
};
接下来是实际的 Rx 流程。假设我想要筛选出奇数整数并在没有观察者时取消服务的订阅:
// Wrap service in a PublishSubject:
PublishSubject<Integer> subject = PublishSubject.create();
mockService.subscribe(subject::onNext);
// Create observable:
Observable<Integer> observable = subject
.doFinally(mockService::unsubscribe)
.filter(i -> i % 2 == 1) // Include only odd integers
.replay(1) // Replay latest to new observers
.refCount();
最后,一个手动测试:
// Subscribe to Observable:
Disposable sub1 = observable.subscribe(i -> System.out.println("sub1 got: " + i));
// Sleep:
Thread.sleep(3300);
// Create 2nd Subscriber:
System.out.println("adding sub2");
Disposable sub2 = observable.subscribe(i -> System.out.println("sub2 got: " + i));
// Sleep:
Thread.sleep(3300);
// Dispose 2nd Subscriber:
System.out.println("disposing sub2");
sub2.dispose();
// Sleep:
Thread.sleep(3300);
// Dispose 1st Subscriber:
sub1.dispose();
// Sleep:
Thread.sleep(3300);
输出:
Subscribe
Producing: 1
sub1 got: 1
Producing: 8
Producing: 6
adding sub2
sub2 got: 1
Producing: 3
sub1 got: 3
sub2 got: 3
Producing: 7
sub1 got: 7
sub2 got: 7
Producing: 1
sub1 got: 1
sub2 got: 1
disposing sub2
Producing: 6
Producing: 7
sub1 got: 7
Producing: 1
sub1 got: 1
Unsubscribe
英文:
I think the latest edit of the question helps explain what you're trying to do and this GitHub link probably has the answer you're looking for:
https://github.com/ReactiveX/RxJava/issues/4675
But I'll still share my test code. I ended up following the advice from the link above and using PublishSubject
with .replay(1).refCount()
.
Let's say we have a third-party service interface:
interface ThirdPartyService
{
void subscribe( Consumer<Integer> responseConsumer );
void unsubscribe();
}
Next let's create a mock implementation that will invoke the consumer with a random int until it unsubscribes:
// Mock service to emit a random integer once per second, no Rx:
ThirdPartyService mockService = new ThirdPartyService() {
Timer timer = new Timer();
@Override
public void subscribe( Consumer<Integer> responseConsumer )
{
System.out.println( "Subscribe" );
Random random = new Random();
TimerTask task = new TimerTask() {
@Override
public void run()
{
int i = random.nextInt( 10 );
System.out.println( "Producing: " + i );
responseConsumer.accept( i );
}
};
timer.schedule( task, 1000, 1000 );
}
@Override
public void unsubscribe()
{
System.out.println( "Unsubscribe" );
timer.cancel();
}
};
Next, the actual Rx pipeline. Let's say I want to filter only odd integers and unsubscribe from service when there are no observers:
// Wrap service in a PublishSubject:
PublishSubject<Integer> subject = PublishSubject.create();
mockService.subscribe( subject::onNext );
// Create observable:
Observable<Integer> observable = subject
.doFinally( mockService::unsubscribe )
.filter( i -> i % 2 == 1 ) // Include only odd integers
.replay( 1 ) // Replay latest to new observers
.refCount();
Finally, a manual test:
// Subscribe to Observable:
Disposable sub1 = observable.subscribe( i -> System.out.println( "sub1 got: " + i ));
// Sleep:
Thread.sleep( 3300 );
// Create 2nd Subscriber:
System.out.println( "adding sub2" );
Disposable sub2 = observable.subscribe( i -> System.out.println( "sub2 got: " + i ));
// Sleep:
Thread.sleep( 3300 );
// Dispose 2nd Subscriber:
System.out.println( "disposing sub2" );
sub2.dispose();
// Sleep:
Thread.sleep( 3300 );
// Dispose 1st Subscriber:
sub1.dispose();
// Sleep:
Thread.sleep( 3300 );
Output:
Subscribe
Producing: 1
sub1 got: 1
Producing: 8
Producing: 6
adding sub2
sub2 got: 1
Producing: 3
sub1 got: 3
sub2 got: 3
Producing: 7
sub1 got: 7
sub2 got: 7
Producing: 1
sub1 got: 1
sub2 got: 1
disposing sub2
Producing: 6
Producing: 7
sub1 got: 7
Producing: 1
sub1 got: 1
Unsubscribe
专注分享java语言的经验与见解,让所有开发者获益!
评论