英文:
achieving Synchronized behavior with ThreadPoolExecutor using SynchronousQueue
问题
我正在编写一个服务类,该类应该有两种模式:同步和异步。
public class ProcessorImpl implements IProcessor {
private final MyRepo repo;
private final Jobrunner runner;
private final boolean isAsync;
private ExecutorService executorService;
@ProcessorImpl
public ProcessorImpl(final MyRepo repo,
final Jobrunner runner) {
this(repo, runner, false);
}
public ProcessorImpl(final MyRepo repo,
final Jobrunner runner,
final boolean isAsync) {
this.repo = repo;
this.runner = runner;
this.isAsync = isAsync;
}
@PostConstruct
public void init() {
if (isAsync) {
executorService = new ThreadPoolExecutor(1,1,60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
} else {
executorService = new ThreadPoolExecutor(1,1,60, TimeUnit.SECONDS, new SynchronousQueue<>());
}
}
@Override
public Response doAction(final Request request, final String id) {
if (isAsync) {
// 提交任务并立即返回不完整的响应与id
} else {
// 提交任务,获取结果,然后返回由Callable返回的响应
}
}
}
切换由isAsync
标志驱动。
在同步模式下,我希望能够提交任务,从任务提交中获取完成的响应,并将其用于构建要返回的Response
。对于这种模式,我正在使用SynchronousQueue
。
在开启异步模式时,只需提交任务并立即返回不完整的响应。对于这种模式,我正在使用LinkedBlockingQueue
来等待队列中的任务。
对于同步模式,我有两个问题:
-
如何以同步方式获取结果并给出响应?我不确定如何在这里使用
Future
。 -
如果在线程处理任务时出现另一个命令任务,会怎么样?它会被拒绝吗?如何避免拒绝?
带有一些代码示例的任何帮助将不胜感激。
谢谢。
英文:
I am writing a service class that is supposed have two modes: synchronous and asynchronous.
public class ProcessorImpl implements IProcessor {
private final MyRepo repo;
private final Jobrunner runner;
private final boolean isAsync;
private ExecutorService executorService;
@ProcessorImpl
public ProcessorImpl(final MyRepo repo,
final Jobrunner runner) {
this(repo, runner, false);
}
public ProcessorImpl(final MyRepo repo,
final Jobrunner runner,
final boolean isAsync) {
this.repo = repo;
this.runner = runner;
this.isAsync = isAsync;
}
@PostConstruct
public void init() {
if (isAsync) {
executorService = new ThreadPoolExecutor(1,1,60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50));
} else {
executorService = new ThreadPoolExecutor(1,1,60, TimeUnit.SECONDS, new SynchronousQueue<>());
}
}
@Override
public Response doAction(final Request request, final String id) {
if (isAsync) {
//submit task and return incomplete response with id
} else {
//submit task and get result and return response that is returned by Callable
}
}
}
The switching is driven by the isAsync
flag.
When Synchronous, I want to be able to submit the task, get the completed response from task submission and use it to build the Response
to return. For this mode, I am using SynchronousQueue
.
When Async mode is on, just submit the task and return immediately with incomplete response. For this mode I am using LinkedBlockingQueue
to wait the tasks in the queue.
I have two questions in regards to Synchronous mode:
-
How do I get the result back and give response in a synchronous way? I am unsure how to use
Future
here. -
What if another commands task comes in while thread was processing the task? Will it get rejected? How can I avoid rejection?
Any help with some code samples will be appreciated.
Thanks
答案1
得分: 0
public class SyncProcessorImpl {
private final MyRepo repo;
private final Jobrunner runner;
public SyncProcessorImpl (final MyRepo repo, final Jobrunner runner) {
this.repo = repo;
this.runner = runner;
}
public Response doAction(final Request request, final String id) {
// 直接进行所有计算,然后返回完整的响应
}
}
public class AsyncProcessorImpl extends SyncProcessorImpl {
private ExecutorService executorService;
public AsyncProcessorImpl(final MyRepo repo, final Jobrunner runner) {
super(repo, runner);
executorService = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
}
public Future<Response> doActionAsync(final Request request, final String id) {
// 提交任务并返回由submit(Callable)返回的Future
return executorService.submit(() -> super.doAction(request, id));
}
}
英文:
Something like this:
public class SyncProcessorImpl {
private final MyRepo repo;
private final Jobrunner runner;
public SyncProcessorImpl (final MyRepo repo, final Jobrunner runner) {
this.repo = repo;
this.runner = runner;
}
public Response doAction(final Request request, final String id) {
// do all calculations directly and return complete response
}
}
public class AsyncProcessorImpl extends SyncProcessorImpl {
private ExecutorService executorService;
public AsyncProcessorImpl(final MyRepo repo, final Jobrunner runner) {
super(repo, runner);
executorService = new ThreadPoolExecutor(1,1,60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50));
}
public Future<Response> doActionAsync(final Request request, final String id) {
//submit task and return future that is returned by submit(Callable)
return executorService.submit(()->super.doActionc(request, id));
}
}
专注分享java语言的经验与见解,让所有开发者获益!
评论