使用SynchronousQueue实现ThreadPoolExecutor的同步行为

huangapple 未分类评论51阅读模式
英文:

achieving Synchronized behavior with ThreadPoolExecutor using SynchronousQueue

问题

我正在编写一个服务类,该类应该有两种模式:同步和异步。

public class ProcessorImpl implements IProcessor {

    private final MyRepo repo;
    private final Jobrunner runner;
    private final boolean isAsync;

    private ExecutorService executorService;

    @ProcessorImpl
    public ProcessorImpl(final MyRepo repo,
                              final Jobrunner runner) {
        this(repo, runner, false);
    }

    public ProcessorImpl(final MyRepo repo,
                              final Jobrunner runner,
                              final boolean isAsync) {
        this.repo = repo;
        this.runner = runner;
        this.isAsync = isAsync;
    }

    @PostConstruct
    public void init() {
        if (isAsync) {          
            executorService = new ThreadPoolExecutor(1,1,60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        } else {
            executorService = new ThreadPoolExecutor(1,1,60, TimeUnit.SECONDS, new SynchronousQueue<>());
        }
    }

    @Override
    public Response doAction(final Request request, final String id) {
		if (isAsync) {
			// 提交任务并立即返回不完整的响应与id
		} else {
			// 提交任务,获取结果,然后返回由Callable返回的响应
		}
    }
}

切换由isAsync标志驱动。

在同步模式下,我希望能够提交任务,从任务提交中获取完成的响应,并将其用于构建要返回的Response。对于这种模式,我正在使用SynchronousQueue

在开启异步模式时,只需提交任务并立即返回不完整的响应。对于这种模式,我正在使用LinkedBlockingQueue来等待队列中的任务。

对于同步模式,我有两个问题:

  1. 如何以同步方式获取结果并给出响应?我不确定如何在这里使用Future

  2. 如果在线程处理任务时出现另一个命令任务,会怎么样?它会被拒绝吗?如何避免拒绝?

带有一些代码示例的任何帮助将不胜感激。

谢谢。

英文:

I am writing a service class that is supposed have two modes: synchronous and asynchronous.

public class ProcessorImpl implements IProcessor {

    private final MyRepo repo;
    private final Jobrunner runner;
    private final boolean isAsync;


    private ExecutorService executorService;

    @ProcessorImpl
    public ProcessorImpl(final MyRepo repo,
                              final Jobrunner runner) {
        this(repo, runner, false);
    }

    public ProcessorImpl(final MyRepo repo,
                              final Jobrunner runner,
                              final boolean isAsync) {
        this.repo = repo;
        this.runner = runner;
        this.isAsync = isAsync;
    }

    @PostConstruct
    public void init() {
        if (isAsync) {          
            executorService = new ThreadPoolExecutor(1,1,60, TimeUnit.SECONDS, new LinkedBlockingQueue&lt;&gt;(50));
        } else {
            executorService = new ThreadPoolExecutor(1,1,60, TimeUnit.SECONDS, new SynchronousQueue&lt;&gt;());
        }
    }

    @Override
    public Response doAction(final Request request, final String id) {
		if (isAsync) {
			//submit task and return incomplete response with id
		} else {
			//submit task and get result and return response that is returned by Callable
		}
    }
}

The switching is driven by the isAsync flag.

When Synchronous, I want to be able to submit the task, get the completed response from task submission and use it to build the Response to return. For this mode, I am using SynchronousQueue.

When Async mode is on, just submit the task and return immediately with incomplete response. For this mode I am using LinkedBlockingQueueto wait the tasks in the queue.

I have two questions in regards to Synchronous mode:

  1. How do I get the result back and give response in a synchronous way? I am unsure how to use Future here.

  2. What if another commands task comes in while thread was processing the task? Will it get rejected? How can I avoid rejection?

Any help with some code samples will be appreciated.

Thanks

答案1

得分: 0

public class SyncProcessorImpl {
    private final MyRepo repo;
    private final Jobrunner runner;

    public SyncProcessorImpl (final MyRepo repo, final Jobrunner runner) {
        this.repo = repo;
        this.runner = runner;
    }

    public Response doAction(final Request request, final String id) {
        // 直接进行所有计算,然后返回完整的响应
    }
}

public class AsyncProcessorImpl extends SyncProcessorImpl {
    private ExecutorService executorService;

    public AsyncProcessorImpl(final MyRepo repo, final Jobrunner runner) {
        super(repo, runner);
        executorService = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
    }

    public Future<Response> doActionAsync(final Request request, final String id) {
        // 提交任务并返回由submit(Callable)返回的Future
        return executorService.submit(() -> super.doAction(request, id));
    }
}
英文:

Something like this:

public class SyncProcessorImpl {
    private final MyRepo repo;
    private final Jobrunner runner;

    public SyncProcessorImpl (final MyRepo repo, final Jobrunner runner) {
        this.repo = repo;
        this.runner = runner;
    }

    public Response doAction(final Request request, final String id) {
        // do all calculations directly and return complete response
    }
}

public class AsyncProcessorImpl extends SyncProcessorImpl {
    private ExecutorService executorService;

    public AsyncProcessorImpl(final MyRepo repo, final Jobrunner runner) {
        super(repo, runner);
        executorService = new ThreadPoolExecutor(1,1,60, TimeUnit.SECONDS, new LinkedBlockingQueue&lt;&gt;(50));
    }

    public Future&lt;Response&gt; doActionAsync(final Request request, final String id) {
        //submit task and return future that is returned by submit(Callable)
       return executorService.submit(()-&gt;super.doActionc(request, id));
    }
}

huangapple
  • 本文由 发表于 2020年7月24日 02:49:42
  • 转载请务必保留本文链接:https://java.coder-hub.com/63061187.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定