Java 8引入CompletableFuture
了一个可组合的Future的新实现(包括一堆thenXxx方法).我想独占使用它,但我想使用的许多库只返回不可组合的Future
实例.
有没有办法将返回的Future
实例包装在一个内部,CompleteableFuture
以便我可以编写它?
有一种方法,但你不会喜欢它.以下方法将a Future
转换为CompletableFuture
:
public staticCompletableFuture makeCompletableFuture(Future future) { if (future.isDone()) return transformDoneFuture(future); return CompletableFuture.supplyAsync(() -> { try { if (!future.isDone()) awaitFutureIsDoneInForkJoinPool(future); return future.get(); } catch (ExecutionException e) { throw new RuntimeException(e); } catch (InterruptedException e) { // Normally, this should never happen inside ForkJoinPool Thread.currentThread().interrupt(); // Add the following statement if the future doesn't have side effects // future.cancel(true); throw new RuntimeException(e); } }); } private static CompletableFuture transformDoneFuture(Future future) { CompletableFuture cf = new CompletableFuture<>(); T result; try { result = future.get(); } catch (Throwable ex) { cf.completeExceptionally(ex); return cf; } cf.complete(result); return cf; } private static void awaitFutureIsDoneInForkJoinPool(Future> future) throws InterruptedException { ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() { @Override public boolean block() throws InterruptedException { try { future.get(); } catch (ExecutionException e) { throw new RuntimeException(e); } return true; } @Override public boolean isReleasable() { return future.isDone(); } }); }
显然,这种方法的问题在于,对于每个Future,线程将被阻塞以等待Future的结果 - 与未来的想法相对立.在某些情况下,可能会做得更好.但是,一般来说,没有积极等待未来结果就没有解决方案.
如果您要使用的库除了Future样式之外还提供回调样式方法,您可以为它提供一个处理程序来完成CompletableFuture而不会有任何额外的线程阻塞.像这样:
AsynchronousFileChannel open = AsynchronousFileChannel.open(Paths.get("/some/file")); // ... CompletableFuturecompletableFuture = new CompletableFuture (); open.read(buffer, position, null, new CompletionHandler () { @Override public void completed(Integer result, Void attachment) { completableFuture.complete(buffer); } @Override public void failed(Throwable exc, Void attachment) { completableFuture.completeExceptionally(exc); } }); completableFuture.thenApply(...)
如果没有回调,我看到解决这个问题的另一种方法是使用一个轮询循环,将所有Future.isDone()
检查放在一个线程上,然后在Future可获取时调用complete.
我发布了一个小小的未来项目,试图比答案中的直接方式做得更好.
主要思想是使用唯一的一个线程(当然不仅仅是一个旋转循环)来检查里面的所有Futures状态,这有助于避免为每个Future - > CompletableFuture转换阻塞来自池的线程.
用法示例:
Future oldFuture = ...; CompletableFuture profit = Futurity.shift(oldFuture);
建议:
http://www.thedevpiece.com/converting-old-java-future-to-completablefuture/
但是,基本上:
public class CompletablePromiseContext { private static final ScheduledExecutorService SERVICE = Executors.newSingleThreadScheduledExecutor(); public static void schedule(Runnable r) { SERVICE.schedule(r, 1, TimeUnit.MILLISECONDS); } }
并且,CompletablePromise:
public class CompletablePromiseextends CompletableFuture { private Future future; public CompletablePromise(Future future) { this.future = future; CompletablePromiseContext.schedule(this::tryToComplete); } private void tryToComplete() { if (future.isDone()) { try { complete(future.get()); } catch (InterruptedException e) { completeExceptionally(e); } catch (ExecutionException e) { completeExceptionally(e.getCause()); } return; } if (future.isCancelled()) { cancel(true); return; } CompletablePromiseContext.schedule(this::tryToComplete); } }
例:
public class Main { public static void main(String[] args) { final ExecutorService service = Executors.newSingleThreadExecutor(); final FuturestringFuture = service.submit(() -> "success"); final CompletableFuture completableFuture = new CompletablePromise<>(stringFuture); completableFuture.whenComplete((result, failure) -> { System.out.println(result); }); } }
让我建议另一个(希望,更好)选项:https: //github.com/vsilaev/java-async-await/tree/master/com.farata.lang.async.examples/src/main/java/com/farata /同时
简而言之,这个想法如下:
介绍CompletableTask
接口 - CompletionStage
+ 的并集
RunnableFuture
变形从方法ExecutorService
返回(而不是)CompletableTask
submit(...)
Future
完成后,我们有可运行和可组合的期货.
实现使用替代的CompletionStage实现(注意,CompletionStage而不是CompletableFuture):
用法:
J8ExecutorService exec = J8Executors.newCachedThreadPool(); CompletionStage= exec .submit( someCallableA ) .thenCombineAsync( exec.submit(someCallableB), (a, b) -> a + " " + b) .thenCombine( exec.submit(someCallableC), (ab, b) -> ab + " " + c);