Is there a good way to add FutureLocal.java to custom Future.java extending CompletableFuture? (example code below)

huangapple 未分类评论49阅读模式
英文:

Is there a good way to add FutureLocal.java to custom Future.java extending CompletableFuture? (example code below)

问题

我有以下代码,它在调用super.thenCompose时返回一个CompletableFuture,而不是我自定义的Custom Future.java,这在关键时刻有点接近。我试图复制Twitter的Scala Futures,它们具有以下功能:

  1. 能够添加类似Twitter Scala Futures的取消链接。
  2. 可以在thenApply和thenCompose链中传递请求上下文,以修复slf4j中的MDC(类似于ThreadLocal,但在每次lambda运行之前重新应用,如下面的代码所示)。
public class Future<T> extends CompletableFuture<T> {

    @Override
    public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) {
        Map<String, Object> state = FutureLocal.fetchState();
        MyFunction f = new MyFunction(state, fn);		
        
        return super.thenApply(f);
    }

    @Override
    public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
        Map<String, Object> state = FutureLocal.fetchState();
        MyFunction f = new MyFunction(state, fn);
        
        return super.thenCompose(f);
    }

    // ... 其他方法 ...

    private class MyFunction implements Function {

        private Map<String, Object> state;
        private Function fn;

        public MyFunction(Map<String, Object> state, Function fn) {
            this.state = state;
            this.fn = fn;
        }

        @Override
        public Object apply(Object t) {
            try {
                FutureLocal.restoreState(state);
                return fn.apply(t);
            } finally {
                FutureLocal.restoreState(null);
            }
        }
    }
}

这是我用来运行上述代码的一些代码,但在映射中记录“test”开始在第3次远程调用时失败,这意味着slf4j MDC将会失效。

public class TestCustomFutures {

    private Executor exec = Executors.newFixedThreadPool(3);

    @Test
    public void testFutureContext() throws InterruptedException, ExecutionException {
        Set<Integer> hashSet = new HashSet<Integer>();
        FutureLocal.put("test", 100);

        CompletableFuture<Integer> f = myRemoteCall(4)
            .thenCompose(s -> myRemoteCall(3))
            .thenCompose(s -> myRemoteCall(2));

        f.get();
    }

    private Future<Integer> myRemoteCall(int i) {
        System.out.println("result=" + i + " map=" + FutureLocal.get("test") + " thread=" + Thread.currentThread().getName());
        
        Future<Integer> f = new Future<Integer>();
        exec.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    f.completeExceptionally(e);
                }
                f.complete(i);
            }
        });
        return f;
    }
}

输出结果如下:

result=4 map=100 thread=main
result=3 map=100 thread=pool-1-thread-1
result=2 map=null thread=pool-1-thread-2

请注意,我们不希望最后一个值为null。

英文:

I have the following code which was kind of close except when I call super.thenCompose and it returns a CompletableFuture instead of my Custom Future.java which is kind of critical. I am trying to copy twitter's scala futures that

  1. Be able to add cancellation chaining like twitter scala's futures

  2. can have a request context flow through the thenApply and thenCompose chains to fix MDC in slf4j (much like a ThreadLocal but it is re-applied just before each lambda is run as seen in the code below)

    public class Future<T> extends CompletableFuture<T> {

    @Override
    public &lt;U&gt; CompletableFuture&lt;U&gt; thenApply(Function&lt;? super T, ? extends U&gt; fn) {
    	Map&lt;String, Object&gt; state = FutureLocal.fetchState();
    	MyFunction f = new MyFunction(state, fn);		
    
    	return super.thenApply(f);
    }
    
    @Override
    public &lt;U&gt; CompletableFuture&lt;U&gt; thenCompose(Function&lt;? super T, ? extends CompletionStage&lt;U&gt;&gt; fn) {
    	Map&lt;String, Object&gt; state = FutureLocal.fetchState();
    	MyFunction f = new MyFunction(state, fn);
    
    	return super.thenCompose(f);
    }
    
    @SuppressWarnings(&quot;hiding&quot;)
    private class MyFunction implements Function {
    
    	private Map&lt;String, Object&gt; state;
    	private Function fn;
    
    	public MyFunction(Map&lt;String, Object&gt; state, @SuppressWarnings(&quot;rawtypes&quot;) Function fn) {
    		this.state = state;
    		this.fn = fn;
    
    	}
    
    	@Override
    	public Object apply(Object t) {
    
    		try {
    			FutureLocal.restoreState(state);
    
    			return fn.apply(t);
    
    		} finally {
    			FutureLocal.restoreState(null);
    		}
    
    
    	}
    
    }
    
    @Override
    public boolean complete(T value) {
    	return super.complete(value);
    }
    
    @Override
    public boolean completeExceptionally(Throwable ex) {
    	return super.completeExceptionally(ex);
    }
    

    }

Here is some code I use to run that code but logging the "test" in the map starts failing on the 3rd remote call meaning slf4j MDC will break down.

public class TestCustomFutures {

	private Executor exec = Executors.newFixedThreadPool(3);
	
	@Test
	public void testFutureContext() throws InterruptedException, ExecutionException {
		
		Set&lt;Integer&gt; hashSet = new HashSet&lt;Integer&gt;();
		
		FutureLocal.put(&quot;test&quot;, 100);
	
		CompletableFuture&lt;Integer&gt; f = myRemoteCall(4)
			.thenCompose(s -&gt; myRemoteCall(3))
			.thenCompose(s -&gt; myRemoteCall(2));
		
		f.get();
	}
	
	private Future&lt;Integer&gt; myRemoteCall(int i) {
		System.out.println(&quot;result=&quot;+i+&quot; map=&quot;+FutureLocal.get(&quot;test&quot;)+&quot; thread=&quot;+Thread.currentThread().getName());

		Future&lt;Integer&gt; f = new Future&lt;Integer&gt;();
		
		exec.execute(new Runnable() {
			
			@Override
			public void run() {
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					f.completeExceptionally(e);
				}
				
				f.complete(i);
			}
		});
		
		return f;
	}
}

The output then is this

result=4 map=100 thread=main
result=3 map=100 thread=pool-1-thread-1
result=2 map=null thread=pool-1-thread-2

notice that last value we do not want to be null

答案1

得分: 0

ahhh,我错过了一个简单的东西,因为我在jdk8中。然而在jdk11中,您可以覆盖它...

@Override
public <U> CompletableFuture<U> newIncompleteFuture() {
    return new Future<U>();
}

在jdk8中,由于某种原因,这段代码无法编译,并且不会调用这个方法 :(. 糟糕,我还不想升级到11,因为还有一些用法仍然在jdk8上 :(.

英文:

ahhh, I was missing one simple thing BECAUSE I was in jdk8. In jdk11 however, you can override this...

@Override
public &lt;U&gt; CompletableFuture&lt;U&gt; newIncompleteFuture() {
	return new Future&lt;U&gt;();
}

In jdk8, for some reason, this would not compile and it would not invoke this :(. Crap, I didn't want to upgrade to 11 yet as some usages are stilll on jdk8 :(.

huangapple
  • 本文由 发表于 2020年5月31日 06:39:11
  • 转载请务必保留本文链接:https://java.coder-hub.com/62109484.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定