RXJava Observable onNext emitted on different threads

huangapple 未分类评论44阅读模式
英文:

RXJava Observable onNext emitted on different threads

问题

如果我尝试在不同的线程上发射onNext在订阅它时无法捕获流上的下一个元素

public static Observable<Student> getStudents()
{
    return Observable.create(e -> {

          for(int i : Arrays.asList(1,2,3))
          {
              Thread t = new Thread(() -> {

                  e.onNext(new Student("anirba", i));

              });
              t.start();
          }
          e.onComplete();
    });
}

订阅此Observable后我没有收到任何响应

Observable<Student> observer = getStudents();

observer.subscribe(i -> System.out.println(i));
英文:

If i try to emit onNext on different thread's, on subscribing it dosent catch the stream on next elements.

public static Observable&lt;Student&gt; getStudents()
{
    return Observable.create(e -&gt; {

          for(int i : Arrays.asList(1,2,3))
          {
              Thread t = new Thread(() -&gt; {

                  e.onNext(new Student(&quot;anirba&quot;, i));

              });
              t.start();
          }
          e.onComplete();
    });
}

On sunbscribing to this observable i dont get any responseenter code here

Observable&lt;Student&gt; observer = getStudents();

    observer.subscribe(i -&gt; System.out.println(i));

答案1

得分: 0

你正在create方法中创建了三个线程,每个线程都在流中添加一个Student对象。之所以没有得到任何输出,是因为这三个线程将独立运行,并且根据线程调度器进行执行。在你的情况下,onComplete()方法可能会在这三个线程将数据添加到Observable流之前被调用。在调用onComplete后,流将被关闭,不会再接受更多数据。要使其正常工作,请进行以下更改,应该可以按你的期望工作。

public static Observable<Student> getStudents() {
    return Observable.create(e -> {
          for(int i : Arrays.asList(1,2,3)) {
              Thread t = new Thread(() -> {
                  e.onNext(new Student("anirba", i));
              });
              t.start();
          }
          Thread.sleep(1000);
          e.onComplete();
    });
}
英文:

You are creating three threads inside the create method and each is adding an object of Student to stream. The reason why you are not getting any output is all these three threads will be running independently and based on thread scheduler it will be executed. In your case, the onComplete() method might get called before all these three threads add data into the Observable stream. and on-call of onComplete, the stream will be closed and no more data will be accepted by the stream. To make it work just to the below changes, it should work as you are expecting.

public static Observable&lt;Student&gt; getStudents() {
    return Observable.create(e -&gt; {
          for(int i : Arrays.asList(1,2,3)) {
              Thread t = new Thread(() -&gt; {
                  e.onNext(new Student(&quot;anirba&quot;, i));
              });
              t.start();
          }
          Thread.sleep(1000);
          e.onComplete();
    });
}

答案2

得分: 0

java.util.concurrent.Executor 现在是执行任务的正确 API。要阻塞当前线程,您可以使用 CountDownLatch,并在所有任务终止时释放它。

ExecutorService executor = Executors.newCachedThreadPool(); // 选择正确的线程池

public Observable<Student> getStudents() {
    return Observable.<Student>create(emitter -> {
        List<Integer> source = Arrays.asList(1, 2, 3);
        CountDownLatch latch = new CountDownLatch(source.size());

        source
                .forEach(i ->
                        executor.submit(() -> {
                            emitter.onNext(new Student("anirba", i));
                            latch.countDown();
                        }));

        latch.await();
        emitter.onComplete();
    }).serialize();
}

另外在其他地方不要忘记在 executor 上调用 shutdown()

您还可以添加 serialize() 操作符以避免 onnext() 调用重叠。

来自契约

Observables 必须以串行方式向观察者发出通知(不得并行)。它们可以从不同的线程发出这些通知,但通知之间必须存在正式的 happens-before 关系。

为了测试目的,您可以添加 Thread.sleep(x) 来查看日志。我之前在这里已经回答过这个问题。

public static void main(String[] args) throws InterruptedException {
     getStudents()
             .subscribe(i -> System.out.println(i));
    
    Thread.sleep(2000);
}
英文:

java.util.concurrent.Executor is the right API now for running tasks. To block the current thread you can use CountDownLatch and release it when all tasks are terminated.

ExecutorService executor = Executors.newCachedThreadPool(); // choose the right one

public Observable&lt;Student&gt; getStudents() {
    return Observable.&lt;Student&gt;create(emitter -&gt; {
        List&lt;Integer&gt; source = Arrays.asList(1, 2, 3);
        CountDownLatch latch = new CountDownLatch(source.size());

        source
                .forEach(i -&gt;
                        executor.submit(() -&gt; {
                            emitter.onNext(new Student(&quot;anirba&quot;, i));
                            latch.countDown();
                        }));

        latch.await();
        emitter.onComplete();
    }).serialize();
}

And elsewhere don't forget to call shutdown() on the executor.

You may also add serialize() operator to avoid onnext() calls to overlap.

From the contract :

> Observables must issue notifications to observers serially (not in
> parallel). They may issue these notifications from different threads,
> but there must be a formal happens-before relationship between the
> notifications.

For the perpose of testing you can add Thread.sleep(x) to see your loggin. I've already answerd this before here

public static void main(String[] args) throws InterruptedException {
     getStudents()
             .subscribe(i -&gt; System.out.println(i));
    
    Thread.sleep(2000);
}

huangapple
  • 本文由 发表于 2020年7月25日 21:34:22
  • 转载请务必保留本文链接:https://java.coder-hub.com/63088984.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定