Spring WebFlux消费者到Sink

huangapple 未分类评论47阅读模式
英文:

Spring WebFlux consumer to sink

问题

这里是一个简单的Spring Boot应用程序:

@SpringBootApplication
@RestController
public class ReactiveApplication {
    
    static Flux<String> fluxString;
    static volatile Queue<String> queue = new ConcurrentLinkedQueueProxy();
    
    private static class ConcurrentLinkedQueueProxy extends ConcurrentLinkedQueue<String> {
        private static final long serialVersionUID = 1L;

        @Override
        public boolean add(String e) {
            synchronized (this) {
                notify();
            }
            return super.add(e);
        }
        
        @Override
        public String poll() {
            synchronized (this) {
                if(isEmpty()) {
                    try {
                        wait();
                    } catch (InterruptedException ex) {}
                }
            }
            return super.peek() == null ? "" : super.poll();
        }
    }
    
    static Consumer<String> consumer = str -> queue.add(str);

    public static void main(String[] args) throws InterruptedException {
        SpringApplication.run(ReactiveApplication.class, args);
    }
    
    static {
        for(int i = 0; i < 10; i++)
            queue.add("testData " + i + " ");
    }
    
    @GetMapping(value = "/", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<String> home() {
        
        Scheduler sch = Schedulers.newParallel("parallel-sch", 1);
        List<String> list = new ArrayList<>(queue);
        queue.removeAll(queue);
        
        fluxString = Flux.<String>create(sink -> {
            sink.onRequest(n -> {
                for(int i = 0; i < n; i++) {
                    sink.next(queue.poll());
                }
            }).onCancel(() -> sch.dispose());
        }).log().subscribeOn(sch).mergeWith(Flux.<String>fromIterable(list));
        
        return fluxString;
        
    }
    
    @GetMapping("/add")
    public String add(@RequestParam String s) {
        consumer.accept(s);
        return s;
    }
}

因此,这个应用程序创建了一个字符串流。访问/将获取队列中存在的所有字符串,然后将从/add资源添加的任何内容合并在一起(忽略"Safe Methods Must be Idempotent"部分)。

我感到奇怪的是,当我将public static void main(...)移动到第一行时,应用程序开始表现异常,而向/add添加新值没有任何效果。我认为可能有一些有趣的事情正在发生,导致应用程序的异常行为。是否有任何解释呢?

英文:

Here is a simple spring boot application:

@SpringBootApplication
@RestController
public class ReactiveApplication {
	
	static Flux&lt;String&gt; fluxString;
	static volatile Queue&lt;String&gt; queue = new ConcurrentLinkedQueueProxy();
	
	private static class ConcurrentLinkedQueueProxy extends ConcurrentLinkedQueue&lt;String&gt; {
		private static final long serialVersionUID = 1L;

		@Override
		public boolean add(String e) {
			synchronized (this) {
				notify();
			}
			return super.add(e);
		}
		
		@Override
		public String poll() {
			synchronized (this) {
				if(isEmpty()) {
					try {
						wait();
					} catch (InterruptedException ex) {}
				}
			}
			return super.peek() == null ? &quot;&quot; : super.poll();
		}
	}
	
	static Consumer&lt;String&gt; consumer = str -&gt; queue.add(str);

	public static void main(String[] args) throws InterruptedException {
		SpringApplication.run(ReactiveApplication.class, args);
	}
	
	static {
		for(int i = 0; i &lt; 10; i++)
			queue.add(&quot;testData &quot; + i + &quot; &quot;);
	}
	
	@GetMapping(value = &quot;/&quot;, produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
	public Flux&lt;String&gt; home() {
		
		Scheduler sch = Schedulers.newParallel(&quot;parallel-sch&quot;, 1);
		List&lt;String&gt; list = new ArrayList&lt;&gt;(queue);
		queue.removeAll(queue);
		
		fluxString = Flux.&lt;String&gt;create(sink -&gt; {
			sink.onRequest(n -&gt; {
				for(int i = 0; i &lt; n; i++) {
					sink.next(queue.poll());
				}
			}).onCancel(() -&gt; sch.dispose());
		}).log().subscribeOn(sch).mergeWith(Flux.&lt;String&gt;fromIterable(list));
		
		return fluxString;
		
	}
	
	@GetMapping(&quot;/add&quot;)
	public String add( @RequestParam String s) {
		consumer.accept(s);
		return s;
	}
	
}

So basically this application creates a String stream. Visiting / will grab all the string present queue and then merge anything that is added from /add resource(ignore the "Safe Methods Must be Idempotent" thing).

What I feel is strange is that when I move public static void main(...) to line 1, the application starts to misbehave and adding new values to /add doesn't have any effect. I think there must be something interesting going on that is making application misbehave. Any explaination?

答案1

得分: 0

我最终使用了这段代码它运行得很好

@SpringBootApplication
@RestController
public class ReactiveApplication {

    private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(1000);
    private static Consumer<String> consumer = str -> {
        try { queue.put(str); }
        catch (InterruptedException e) {}
    };
    static {
        for (int i = 0; i < 10; i++) queue.add("testData " + i + " ");
    }

    public static void main(String[] args) {
        SpringApplication.run(ReactiveApplication.class, args);
    }

    @GetMapping(value = "/", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<String> home() {

        final Scheduler sch = Schedulers.newSingle("async-flux");

        return Flux.<String>generate(sink -> {
            try { sink.next(queue.take()); }
            catch (InterruptedException e) { }
        }).log().subscribeOn(sch);

    }

    @GetMapping("/add")
    public String add(@RequestParam String s) {
        consumer.accept(s);
        return s;
    }

}
英文:

I ended up using this which works great:

@SpringBootApplication
@RestController
public class ReactiveApplication {

	private static BlockingQueue&lt;String&gt; queue = new ArrayBlockingQueue&lt;&gt;(1000);
	private static Consumer&lt;String&gt; consumer = str -&gt; {
		try { queue.put(str); }
		catch (InterruptedException e) {}
	};
	static {
		for (int i = 0; i &lt; 10; i++) queue.add(&quot;testData &quot; + i + &quot; &quot;);
	}

	public static void main(String[] args) {
		SpringApplication.run(ReactiveApplication.class, args);
	}

	@GetMapping(value = &quot;/&quot;, produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
	public Flux&lt;String&gt; home() {

		final Scheduler sch = Schedulers.newSingle(&quot;async-flux&quot;);

		return Flux.&lt;String&gt;generate(sink -&gt; {
			try { sink.next(queue.take()); }
			catch (InterruptedException e) { }
		}).log().subscribeOn(sch);

	}

	@GetMapping(&quot;/add&quot;)
	public String add(@RequestParam String s) {
		consumer.accept(s);
		return s;
	}

}

huangapple
  • 本文由 发表于 2020年5月3日 21:47:50
  • 转载请务必保留本文链接:https://java.coder-hub.com/61575514.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定