标题翻译
Is there a better way for cancelling a chain of futures in java?
问题
在Scala中使用Twitter Futures,我只需要这样做,就可以默认地通过中断取消整个未来链,从而防止不必要的工作。 (但为了简化事情,下面的代码是用Java编写的)
private CompletableFuture<Integer> startProcess(int i) {
CompletableFuture<Integer> future1 = remoteCall(5);
CompletableFuture<Integer> future2 = future1.thenCompose(s -> remoteCall(10));
CompletableFuture<Integer> future3 = future2.thenCompose(s -> remoteCall(15));
CompletableFuture<Integer> future4 = future3.thenCompose(s -> remoteCall(20));
return future4;
}
在上面的Scala代码中,所有的未来(futures)都会被取消,除非在其中一个上安装了中断处理程序(在分叉情况下可能会这样做,但这与本文无关)。
然而在Java中,我似乎需要进行以下操作(但我想知道是否有更好的方法):
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class FutureTest {
private Executor exec = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
new FutureTest().start();
}
private void start() {
CompletableFuture<Integer> f = startProcess(5);
f.cancel(false);
}
private CompletableFuture<Integer> startProcess(int i) {
CompletableFuture<Integer> future1 = remoteCall(5);
CompletableFuture<Integer> future2 = future1.thenCompose(s -> remoteCall(10));
CompletableFuture<Integer> future3 = future2.thenCompose(s -> remoteCall(15));
AtomicBoolean cancelled = new AtomicBoolean(false);
CompletableFuture<Integer> future4 = future3.thenCompose(s -> {
if (cancelled.get()) {
return CompletableFuture.completedFuture(null);
}
return remoteCall(20);
});
future4.exceptionally(t -> {
if (t instanceof CancellationException) {
cancelled.set(true);
}
return null;
});
return future4;
}
public CompletableFuture<Integer> remoteCall(int value) {
CompletableFuture<Integer> future = new CompletableFuture<>();
exec.execute(() -> {
System.out.println("Running task=" + value);
try {
Thread.sleep(5000);
future.complete(value + 4);
} catch (InterruptedException e) {
future.completeExceptionally(e);
}
});
return future;
}
}
这似乎是Java未来(futures)的一个很大的不足之处,因为在大多数微服务中,你只需链式地调用几个远程调用,如果某个客户端断开连接,导致我取消了顶级未来,大多数情况下,我希望整个链都被取消,以便我们不会向永远不会响应客户端的数据库发送请求。
当然,你可以使用future1.thenApply(doSomething1)
和future1.thenApply(doSomething2)
同时并行地做两件事情,但这就是中断(interrupts)发挥作用的地方。默认情况应该是取消整个未来链。
作为一个副笔,每当我看到Java试图采用开源项目中的某些内容,比如日志记录,他们总是会放入一个非常简单的实现,比原来的要不可用得多,就像他们试图创建java.util.logging
来替代log4j
一样,当然log4j
(以及没有slf4j
)要好得多。我想知道我们是否需要创建一个实际可用于以下情况的开源CompletableFuture
:
- 上述取消情况
- 拥有类似
Local.scala
的Local.java
,它可以像ThreadLocal
一样跟踪请求,使slf4j
中的MDC与CompletableFutures
一起工作!(目前,MDC
在slf4j
中仅在Scala的Twitter Futures中起作用)。在Java的CompletableFutures
中,它会丢失状态,因此log.info()
无法获取MDC
数据。
关于中断/取消,这篇文章中有一些很好的信息,其中有一个完整的章节:https://twitter.github.io/finagle/guide/developers/Futures.html,涉及到Twitter Futures和Java Futures。
谢谢你的任何见解。
Dean
英文翻译
In scala with twitter Futures, I would simply do this and it would cancel a whole chain of futures (via interrupts) by default PREVENTING unecessary work being done. (CODE WRITTEN in JAVA though to make things easier)
private CompletableFuture<Integer> startProcess(int i) {
CompletableFuture<Integer> future1 = remoteCall(5);
CompletableFuture<Integer> future2 = future1.thenCompose(s -> remoteCall(10));
CompletableFuture<Integer> future3 = future2.thenCompose(s -> remoteCall(15));
CompletableFuture<Integer> future4 = future3.thenCompose(s -> remoteCall(20));
return future4;
}
All futures above would be cancelled in scala UNLESS some interrupt handler is installed on one of them(which I might do in a forking case but that is irrelevant for this post).
In java, I have to do this nastiness it seems (but I am wondering if there is a better way)..
public class FutureTest {
private Executor exec = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
new FutureTest().start();
}
private void start() {
CompletableFuture<Integer> f = startProcess(5);
f.cancel(false);
}
private CompletableFuture<Integer> startProcess(int i) {
CompletableFuture<Integer> future1 = remoteCall(5);
CompletableFuture<Integer> future2 = future1.thenCompose(s -> remoteCall(10));
CompletableFuture<Integer> future3 = future2.thenCompose(s -> remoteCall(15));
future3.exceptionally(t -> {
if(t instanceof CancellationException) {
future2.cancel(false);
t.printStackTrace();
}
return null;
});
CompletableFuture<Integer> future4 = future3.thenCompose(s -> remoteCall(20));
future4.exceptionally(t -> {
if(t instanceof CancellationException) {
future3.cancel(false);
t.printStackTrace();
}
return null;
});
return future4;
}
public CompletableFuture<Integer> remoteCall(int value) {
CompletableFuture<Integer> future = new CompletableFuture<Integer>();
exec.execute(new Runnable() {
@Override
public void run() {
System.out.println("RUnning task="+value);
try {
Thread.sleep(5000);
future.complete(value+4);
} catch (InterruptedException e) {
future.completeExceptionally(e);
}
}
});
return future;
}
}
It seems to me that this is a big deficiency of java futures because 'most' of the time, in microservices you just chain a few remote calls to happen and if some client disconnects causing me to cancel the top level future, MOST of the time, I want the whole chain to cancel so we don't send requests to the DB's that will never respond to clients anyways.
Of course, you may have future1.thenApply(doSomething1) and future1.thenApply(doSomething2) so you can do 2 things in parallel BUT that is where interrupts come in. The default case should be to cancel everything up the chain of futures.
I guess as a side note, every time I see java try to take something that someone did in open source like logging, they completely put in a very minimal implementation that is way less usable than the original like them trying to create java.util.logging to replace log4j but of course log4j (and no slf4j) was way way better :(.
I am wondering if we need to create an open source CompletableFuture that actually works for
- The above cancel case
- Having a Local.java (like the Local.scala) that follows the request through like a ThreadLocal does so the MDC in slf4j works with CompletableFutures! (Currently MDC in slf4j only works with scala twitter futures). It breaks in java's CompletableFutures losing the state so log.info() can't grab the MDC data.
There is some great info and a whole section on interruption/cancellation in this article
https://twitter.github.io/finagle/guide/developers/Futures.html regarding twitter futures and java futures.
thanks for any insight on this.
Dean
专注分享java语言的经验与见解,让所有开发者获益!
评论