英文:
Selecting between task executor with LinkedBlockingQueue and SynchronousQueue
问题
我目前有一个处理器实现,它使用LinkedBlockingQueue
创建一个ThreadPoolExecutor
,以提供对传入作业/任务的异步提交。这样做的目的是为客户提供即时响应并返回一些初始响应。
下面的TradeProcessor
位于一个公共项目中(假设为项目A),并被其他项目(假设项目B和C)使用:
public class TradeProcessor implements Processor {
private final TestRepository repo;
private final JobRunner runner;
private int corePoolSize = 1; //始终为1
private int maxPoolSize = 1; //始终为1
private int keepAliveTime = 60;
private int queueSize = 10;
private ExecutorService executorService;
public AsyncProcessor(final TestRepository repo, final JobRunner runner) {
this.repo = repo;
this.runner = runner;
}
@PostConstruct
public void init() {
executorService = createNewThreadPoolExecutor();
}
@Override
public MyResponse process(final TestRequest request, final String id) {
executorService.execute(new JobRunTask(runner, repo, Pair.of(id, request)));
return new MyResponse(request.getName(), id, SUBMITTED, initialJobStatus());
}
private ThreadPoolExecutor createNewThreadPoolExecutor() {
return new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, SECONDS,
new LinkedBlockingQueue<> (queueSize),
new ThreadFactoryBuilder().setNameFormat("abc-thread-%d").build(),
createRejectedExecutionHandler()) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
if (null != t) {
//bla bla
}
}
};
}
private RejectedExecutionHandler createRejectedExecutionHandler() {
return (runnable, executor) -> {
//bla bla
};
}
}
我现在希望在上述处理器中添加一些同步功能。例如,某个项目可以选择“等待”响应,并获得实际的完整响应,而不是初始响应。当任务执行/提交并获得结果后,它会阻塞并返回响应。
我理解如果我使用SynchronousQueue
创建一个ThreadPoolExecutor
,可以满足这个需求。如何实现这一点,并根据各个项目提供的一些标志在同步和异步之间进行切换呢?
英文:
I currently have a processor implementation that is creating a ThreadPoolExecutor
using a LinkedBlockingQueue
as to offer async submissions of incoming jobs/tasks. The intention for this is to offer client an immediate response with some initial response.
The below TradeProcessor
sits in a common project (say Project A) and is used by other projects (say project B and C):
public class TradeProcessor implements Processor {
private final TestRepository repo;
private final JobRunner runner;
private int corePoolSize = 1 //always 1
private int maxPoolSize = 1; //always 1
private int keepAliveTime = 60
private int queueSize = 10;
private ExecutorService executorService;
public AsyncProcessor(final TestRepository repo, final JobRunner runner) {
this.repo = repo;
this.runner = runner;
}
@PostConstruct
public void init() {
executorService = createNewThreadPoolExecutor();
}
@Override
public MyResponse process(final TestRequest request, final String id) {
executorService.execute(new JobRunTask(runner, repo, Pair.of(id, request)));
return new MyResponse(request.getName(), id, SUBMITTED, initialJobStatus());
}
private ThreadPoolExecutor createNewThreadPoolExecutor() {
return new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, SECONDS,
new LinkedBlockingQueue<>(queueSize),
new ThreadFactoryBuilder().setNameFormat("abc-thread-%d").build(),
createRejectedExecutionHandler()) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
if (null != t) {
//bla bla
}
}
};
}
private RejectedExecutionHandler createRejectedExecutionHandler() {
return (runnable, executor) -> {
//bla bla
};
}
}
I am now looking to add some synchronous functionality to the above Processor. For example, A project may choose that it can wait for a response and have the actual full response back and not the initial one. It blocks when task executed/submitted and has the result back and sends response.
I understand I can satisfy this requirement if I create an ThreadPoolExecutor
using SynchronousQueue
? How can I implement this and switch between the two modes, sync and async, based on some flags that the individuals projects will provide?
答案1
得分: 0
你不需要使用SynchronousQueue,可以直接调用JobRunTask的方法,在你想要同步发生的时候获取响应。
Processor接口定义了三个方法:
interface Processor {
// 这将将请求放入队列并返回默认值
MyResponse process(final TestRequest request, final String id);
// 这将同步处理请求
MyResponse processSync(final TestRequest request, final String id);
// 这将返回一个Future,用于检查执行是否完成,同时返回一个响应
Future<MyResponse> processAsync(final TestRequest request, final String id);
}
你的JobRunTask类将定义两个公共方法,与只有一个方法相反,在其中一个方法中,它将通过执行业务逻辑返回响应,而在另一个方法中,它将作为可运行任务运行。
class JobRunTask implements Runnable {
@Override
public void run() {
MyResponse response = execute();
// 对响应进行处理
}
public MyResponse execute() {
// 你的业务逻辑
return null;
}
}
一旦你掌握了这两个接口,你可以实现你的TradeProcessor类如下:
public class TradeProcessor implements Processor {
private ExecutorService executorService;
private final JobRunner runner;
private final TestRepository repo;
@Override
public MyResponse process(final TestRequest request, final String id) {
executorService.execute(new JobRunTask(runner, repo, Pair.of(id, request)));
return new MyResponse(request.getName(), id, SUBMITTED, initialJobStatus());
}
@Override
public MyResponse processSync(final TestRequest request, final String id) {
return new JobRunTask(runner, repo, Pair.of(id, request)).execute();
}
@Override
public Future<MyResponse> processAsync(final TestRequest request, final String id) {
return executorService.submit(() -> new JobRunTask(runner, repo, Pair.of(id, request)));
}
}
其他初始化/对象创建等细节留给操作者处理。
使用这种方法,你不需要在不同的队列之间切换。它还允许调用者查看请求是否已完成,在调用者并行运行多个线程的情况下。
英文:
You do not have to use SynchrnousQueue, you can directly call the method of the JobRunTask to get the response when you want things to happen synchronously.
Processor interface defines three method
interface Processor {
// This will enqueue the request and returns default value
MyResponse process(final TestRequest request, final String id);
// this will process request in sync
MyResponse processSync(final TestRequest request, final String id);
// this returns a future that can be used to check if the execution is completed or not and also returns a response.
Future<MyResponse> processAsync(final TestRequest request, final String id);
}
Your JobRunTask class would define two public methods as opposed to one method, in one of the method it will return the response by executing the business logic while in other it will run as a runnable task.
class JobRunTask implements Runnable {
@Override
public void run() {
MyResponse response = execute();
// do something with response
}
public MyResponse execute() {
// your business logic
return null;
}
}
Once you have these two interfaces handy you can implement your TradeProcessor class as.
public class TradeProcessor implements Processor {
private ExecutorService executorService;
private final JobRunner runner;
private final TestRepository repo;
@Override
public MyResponse process(final TestRequest request, final String id) {
executorService.execute(new JobRunTask(runner, repo, Pair.of(id, request)));
return new MyResponse(request.getName(), id, SUBMITTED, initialJobStatus());
}
@Override
public MyResponse processSync(final TestRequest request, final String id) {
return new JobRunTask(runner, repo, Pair.of(id, request)).execute();
}
@Override
public Future<MyResponse> processAsync(final TestRequest request, final String id) {
return executorService.submit(() -> new JobRunTask(runner, repo, Pair.of(id, request)));
}
}
Other details like initialization/object creations etc have been left for the op.
Using this you would not have to switch from one queue to another. It will also allow the caller to see the request was completed or not, in case the caller runs multiple threads in parallel.
专注分享java语言的经验与见解,让所有开发者获益!
评论