在RxJava2中是否有类似于doAfterSubscribe的方法?

huangapple 未分类评论43阅读模式
英文:

Is there a doAfterSubscribe equivalent in RxJava2?

问题

我有一个可观察对象,用于流式传输所有请求的响应。我想在进行请求时创建该可观察对象的过滤器,以便我可以与多个订阅者共享输出。以下是一些示例代码。

PublishSubject<String> publishSubject = PublishSubject.create();
Observable<String> fooObservable = publishSubject.filter(value -> value.startsWith("foo"))
        .doOnSubscribe(disposable -> {
            publishSubject.onNext("foobar");
        })
        .replay(1)
        .refCount();

fooObservable.subscribe(val -> log.info("A val : <{}>", val));

我正在使用PublishSubject作为我的模拟服务,因为有时服务会立即返回响应。

我发现的问题是,因为没有当前订阅,当立即有结果返回时,我的fooObservable没有被填充。也就是说,当我想要看到:

A val : <foobar>

时,我没有得到日志输出。

注意,我使用这段代码得到相同的结果:

PublishSubject<String> publishSubject = PublishSubject.create();
Observable<String> fooObservable = publishSubject.filter(value -> value.startsWith("foo"))
        .replay(1)
        .refCount();

publishSubject.onNext("foobar");
fooObservable.subscribe(val -> log.info("A val : <{}>", val));

因此,问题在于fooObservable直到被订阅之后才订阅到PublishSubject。

是否有一种方法可以在订阅fooObservable后立即运行代码?

编辑:
我考虑过类似以下的代码:

PublishSubject<String> publishSubject = PublishSubject.create();
BehaviorSubject<String> fooObservable = BehaviorSubject.create();
publishSubject.filter(value -> value.startsWith("foo")).subscribe(fooObservable);
publishSubject.onNext("foobar");
fooObservable.subscribe(val -> log.info("A val : <{}>", val));

但是那样我会有两个订阅,并且我不确定如何清理,因为在过滤器后的订阅不会返回一个可处理的对象。

编辑2:关于后台任务的描述。

我有一个第三方服务,我的代码需要订阅该服务。此服务会调用我的代码中的一个onResponse方法,参数是我的原始请求和响应。响应可能会随时通过对onResponse的新调用进行更新。

我想要创建一个包装器,为此服务创建一个方法:

public Observable<Response> getObservable(Request req);

如果请求与已订阅的请求匹配,则在订阅时,可观察对象应立即提供最新的匹配值。

当没有订阅者时,我需要取消订阅我正在包装的服务。

英文:

I have an observable that streams responses to all requests. I'm wanting to create a filter of that observable when a request is made so that I can share the output with multiple subscribers. Following is some example code.

    PublishSubject&lt;String&gt; publishSubject = PublishSubject.create();
	Observable&lt;String&gt; fooObservable = publishSubject.filter(value -&gt; value.startsWith(&quot;foo&quot;))
			.doOnSubscribe(disposable -&gt; {
				publishSubject.onNext(&quot;foobar&quot;);
			})
			.replay(1)
			.refCount();

	fooObservable.subscribe(val -&gt; log.info(&quot;A val : &lt;{}&gt;&quot;, val));

I'm using a PublishSubject as my mock service because sometimes the service will return a response immediately.

What I'm finding is that because there is no current subscription when there is an immediate result my fooObservable is not being populated. i.e. I get no log output when I want to see :

A val : &lt;foobar&gt;

Note that I get the same result with this code :

PublishSubject&lt;String&gt; publishSubject = PublishSubject.create();
Observable&lt;String&gt; fooObservable = publishSubject.filter(value -&gt; value.startsWith(&quot;foo&quot;))
		.replay(1)
		.refCount();

publishSubject.onNext(&quot;foobar&quot;);
fooObservable.subscribe(val -&gt; log.info(&quot;A val : &lt;{}&gt;&quot;, val));

So the issue is that the fooObservable doesn't subscribe to the PublishSubject until after it has been subscribed to,

Is there a way to run code immediately after the first subscription to the fooObservable?

Edit:
I thought about something like :

PublishSubject&lt;String&gt; publishSubject = PublishSubject.create();
BehaviorSubject&lt;String&gt; fooObservable = BehaviorSubject.create();
publishSubject.filter(value -&gt; value.startsWith(&quot;foo&quot;)).subscribe(fooObservable);
publishSubject.onNext(&quot;foobar&quot;);
fooObservable.subscribe(val -&gt; log.info(&quot;A val : &lt;{}&gt;&quot;, val));

But then I've got 2 subscriptions and I'm not sure how to clean up as the subscribe after the filter does not return a disposable.

Edit 2 : A description of the background task.

I have a third party service that my code needs to subscribe to. This service calls an onResponse method in my code with a parameter containing my original request and the response. The response may be updated by a new call to onResponse at any time.

I want to wrap to create a wrapper for this service that provides a method:

public Observable&lt;Response&gt; getObservable(Request req);

If the request matches one that is already subscribed then the observable should provide the most recent matching value immediately on subscription.

When there are no subscribers I need to unsubscribe from the service I'm wrapping.

答案1

得分: 0

我认为publish()connect()是你所需要的。在这里你可以阅读更多相关信息。

在你的情况下,大致会是这样:

PublishSubject<String> publishSubject = PublishSubject.create();
Observable<String> fooObservable = publishSubject
    .filter(value -> value.startsWith("foo"))
    .publish();

fooObservable.subscribe(val -> log.info("A val : <{}>", val));
Observable<Object> o2 = fooObservable.map(val -> new Object());
Observable<Object> o3 = fooObservable.map(val -> /* 在这里添加内容 */);

Disposable disposable = fooObservable.connect();

但是记得要执行disposable.dispose()以避免内存泄漏。

编辑

BehaviorSubject<String> publishSubject = BehaviorSubject.create();
Observable<String> fooObservable = publishSubject.filter(value ->         
    value.startsWith("foo"));
fooObservable.subscribe(val -> log.info("A val : <{}>", val));
publishSubject.onNext("foobar");
英文:

I think publish() and connect() are what you need. Here you can read more about it.

In your case it's gonna be something like:

PublishSubject&lt;String&gt; publishSubject = PublishSubject.create();
Observable&lt;String&gt; fooObservable = publishSubject
    .filter(value -&gt; value.startsWith(&quot;foo&quot;))
    .publish();

fooObservable.subscribe(val -&gt; log.info(&quot;A val : &lt;{}&gt;&quot;, val));
Observable&lt;Object&gt; o2 = fooObservable.map { new Object() }
Observable&lt;Object&gt; o3 = fooObservable.map { /* Something here*/ }

Disposable disposable = fooObservable.connect()

But remember to do disposable.dispose() to not have leak

EDIT

BehaviorSubject&lt;String&gt; publishSubject = BehaviorSubject.create();
Observable&lt;String&gt; fooObservable = publishSubject.filter(value -&gt;         
value.startsWith(&quot;foo&quot;));
fooObservable.subscribe(val -&gt; log.info(&quot;A val : &lt;{}&gt;&quot;, val));
publishSubject.onNext(&quot;foobar&quot;);

答案2

得分: 0

我认为问题的最新编辑有助于解释您试图做什么,而这个GitHub链接可能包含您正在寻找的答案:
https://github.com/ReactiveX/RxJava/issues/4675

但我仍然会分享我的测试代码。最终,我遵循了上面链接的建议,使用了 PublishSubject.replay(1).refCount()

假设我们有一个第三方服务接口:

interface ThirdPartyService
{
    void subscribe(Consumer<Integer> responseConsumer);
    
    void unsubscribe();
}

接下来,让我们创建一个模拟实现,它将在取消订阅之前使用随机整数调用消费者:

// Mock service to emit a random integer once per second, no Rx:
ThirdPartyService mockService = new ThirdPartyService() {
    
    Timer timer = new Timer();
    
    @Override
    public void subscribe(Consumer<Integer> responseConsumer)
    {
        System.out.println("Subscribe");
        Random random = new Random();
        
        TimerTask task = new TimerTask() {
            @Override
            public void run()
            {
                int i = random.nextInt(10);
                System.out.println("Producing: " + i);
                responseConsumer.accept(i);
            }
        };
        
        timer.schedule(task, 1000, 1000);
    }
    
    @Override
    public void unsubscribe()
    {
        System.out.println("Unsubscribe");
        timer.cancel();
    }
};

接下来是实际的 Rx 流程。假设我想要筛选出奇数整数并在没有观察者时取消服务的订阅:

// Wrap service in a PublishSubject:
PublishSubject<Integer> subject = PublishSubject.create();
mockService.subscribe(subject::onNext);
        
// Create observable:
Observable<Integer> observable = subject
        .doFinally(mockService::unsubscribe)
        .filter(i -> i % 2 == 1)    // Include only odd integers
        .replay(1)                   // Replay latest to new observers
        .refCount();

最后,一个手动测试:

// Subscribe to Observable:
Disposable sub1 = observable.subscribe(i -> System.out.println("sub1 got: " + i));
        
// Sleep:
Thread.sleep(3300);
        
// Create 2nd Subscriber:
System.out.println("adding sub2");
Disposable sub2 = observable.subscribe(i -> System.out.println("sub2 got: " + i));
        
// Sleep:
Thread.sleep(3300);
        
// Dispose 2nd Subscriber:
System.out.println("disposing sub2");
sub2.dispose();
        
// Sleep:
Thread.sleep(3300);
        
// Dispose 1st Subscriber:
sub1.dispose();
        
// Sleep:
Thread.sleep(3300);

输出:

Subscribe
Producing: 1
sub1 got: 1
Producing: 8
Producing: 6
adding sub2
sub2 got: 1
Producing: 3
sub1 got: 3
sub2 got: 3
Producing: 7
sub1 got: 7
sub2 got: 7
Producing: 1
sub1 got: 1
sub2 got: 1
disposing sub2
Producing: 6
Producing: 7
sub1 got: 7
Producing: 1
sub1 got: 1
Unsubscribe
英文:

I think the latest edit of the question helps explain what you're trying to do and this GitHub link probably has the answer you're looking for:
https://github.com/ReactiveX/RxJava/issues/4675

But I'll still share my test code. I ended up following the advice from the link above and using PublishSubject with .replay(1).refCount().

Let's say we have a third-party service interface:

interface ThirdPartyService
{
	void subscribe( Consumer&lt;Integer&gt; responseConsumer );
	
	void unsubscribe();
}

Next let's create a mock implementation that will invoke the consumer with a random int until it unsubscribes:

	// Mock service to emit a random integer once per second, no Rx:
	ThirdPartyService mockService = new ThirdPartyService() {
		
		Timer timer = new Timer();
		
		@Override
		public void subscribe( Consumer&lt;Integer&gt; responseConsumer )
		{
			System.out.println( &quot;Subscribe&quot; );
			Random random = new Random();
			
			TimerTask task = new TimerTask() {
		        @Override
				public void run()
		        {
		        	int i = random.nextInt( 10 );
		        	System.out.println( &quot;Producing: &quot; + i );
		        	responseConsumer.accept( i );
		        }
		    };
		    
		    timer.schedule( task, 1000, 1000 );
		}
		
		@Override
		public void unsubscribe()
		{
			System.out.println( &quot;Unsubscribe&quot; );
			timer.cancel();
		}
	};

Next, the actual Rx pipeline. Let's say I want to filter only odd integers and unsubscribe from service when there are no observers:

	// Wrap service in a PublishSubject:
	PublishSubject&lt;Integer&gt; subject = PublishSubject.create();
	mockService.subscribe( subject::onNext );
			
	// Create observable:
	Observable&lt;Integer&gt; observable = subject
			.doFinally( mockService::unsubscribe )
			.filter( i -&gt; i % 2 == 1 )	// Include only odd integers
			.replay( 1 )				// Replay latest to new observers
			.refCount();

Finally, a manual test:

	// Subscribe to Observable:
	Disposable sub1 = observable.subscribe( i -&gt; System.out.println( &quot;sub1 got: &quot; + i ));
	
	// Sleep:
	Thread.sleep( 3300 );
	
	// Create 2nd Subscriber:
	System.out.println( &quot;adding sub2&quot; );
	Disposable sub2 = observable.subscribe( i -&gt; System.out.println( &quot;sub2 got: &quot; + i ));
	
	// Sleep:
	Thread.sleep( 3300 );
	
	// Dispose 2nd Subscriber:
	System.out.println( &quot;disposing sub2&quot; );
	sub2.dispose();
	
	// Sleep:
	Thread.sleep( 3300 );
	
	// Dispose 1st Subscriber:
	sub1.dispose();
	
	// Sleep:
	Thread.sleep( 3300 );

Output:

Subscribe
Producing: 1
sub1 got: 1
Producing: 8
Producing: 6
adding sub2
sub2 got: 1
Producing: 3
sub1 got: 3
sub2 got: 3
Producing: 7
sub1 got: 7
sub2 got: 7
Producing: 1
sub1 got: 1
sub2 got: 1
disposing sub2
Producing: 6
Producing: 7
sub1 got: 7
Producing: 1
sub1 got: 1
Unsubscribe

huangapple
  • 本文由 发表于 2020年4月4日 21:33:33
  • 转载请务必保留本文链接:https://java.coder-hub.com/61028886.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定