当前位置:  开发笔记 > 编程语言 > 正文

将Java Future转换为CompletableFuture

如何解决《将JavaFuture转换为CompletableFuture》经验,为你挑选了5个好方法。

Java 8引入CompletableFuture了一个可组合的Future的新实现(包括一堆thenXxx方法).我想独占使用它,但我想使用的许多库只返回不可组合的Future实例.

有没有办法将返回的Future实例包装在一个内部,CompleteableFuture以便我可以编写它?



1> nosid..:

有一种方法,但你不会喜欢它.以下方法将a Future转换为CompletableFuture:

public static  CompletableFuture makeCompletableFuture(Future future) {
  if (future.isDone())
    return transformDoneFuture(future);
  return CompletableFuture.supplyAsync(() -> {
    try {
      if (!future.isDone())
        awaitFutureIsDoneInForkJoinPool(future);
      return future.get();
    } catch (ExecutionException e) {
      throw new RuntimeException(e);
    } catch (InterruptedException e) {
      // Normally, this should never happen inside ForkJoinPool
      Thread.currentThread().interrupt();
      // Add the following statement if the future doesn't have side effects
      // future.cancel(true);
      throw new RuntimeException(e);
    }
  });
}

private static  CompletableFuture transformDoneFuture(Future future) {
  CompletableFuture cf = new CompletableFuture<>();
  T result;
  try {
    result = future.get();
  } catch (Throwable ex) {
    cf.completeExceptionally(ex);
    return cf;
  }
  cf.complete(result);
  return cf;
}

private static void awaitFutureIsDoneInForkJoinPool(Future future)
    throws InterruptedException {
  ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
    @Override public boolean block() throws InterruptedException {
      try {
        future.get();
      } catch (ExecutionException e) {
        throw new RuntimeException(e);
      }
      return true;
    }
    @Override public boolean isReleasable() {
      return future.isDone();
    }
  });
}

显然,这种方法的问题在于,对于每个Future,线程将被阻塞以等待Future的结果 - 与未来的想法相对立.在某些情况下,可能会做得更好.但是,一般来说,没有积极等待未来结果就没有解决方案.


嗯...这个解决方案不是吃"公共池"中的一个线程,只是为了等待?那些"公共池"线程永远不应该阻止......嗯......
这可能并不完美,但是使用CompletableFuture.supplyAsync(supplier,new SinglethreadExecutor())至少不会阻塞公共池线程。
求求你,永远不要那样做

2> Kafkaesque..:

如果您要使用的库除了Future样式之外还提供回调样式方法,您可以为它提供一个处理程序来完成CompletableFuture而不会有任何额外的线程阻塞.像这样:

    AsynchronousFileChannel open = AsynchronousFileChannel.open(Paths.get("/some/file"));
    // ... 
    CompletableFuture completableFuture = new CompletableFuture();
    open.read(buffer, position, null, new CompletionHandler() {
        @Override
        public void completed(Integer result, Void attachment) {
            completableFuture.complete(buffer);
        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            completableFuture.completeExceptionally(exc);
        }
    });
    completableFuture.thenApply(...)

如果没有回调,我看到解决这个问题的另一种方法是使用一个轮询循环,将所有Future.isDone()检查放在一个线程上,然后在Future可获取时调用complete.



3> Dmitry Spikh..:

我发布了一个小小的未来项目,试图比答案中的直接方式做得更好.

主要思想是使用唯一的一个线程(当然不仅仅是一个旋转循环)来检查里面的所有Futures状态,这有助于避免为每个Future - > CompletableFuture转换阻塞来自池的线程.

用法示例:

Future oldFuture = ...;
CompletableFuture profit = Futurity.shift(oldFuture);



4> Gabriel Fran..:

建议:

http://www.thedevpiece.com/converting-old-java-future-to-completablefuture/

但是,基本上:

public class CompletablePromiseContext {
    private static final ScheduledExecutorService SERVICE = Executors.newSingleThreadScheduledExecutor();

    public static void schedule(Runnable r) {
        SERVICE.schedule(r, 1, TimeUnit.MILLISECONDS);
    }
}

并且,CompletablePromise:

public class CompletablePromise extends CompletableFuture {
    private Future future;

    public CompletablePromise(Future future) {
        this.future = future;
        CompletablePromiseContext.schedule(this::tryToComplete);
    }

    private void tryToComplete() {
        if (future.isDone()) {
            try {
                complete(future.get());
            } catch (InterruptedException e) {
                completeExceptionally(e);
            } catch (ExecutionException e) {
                completeExceptionally(e.getCause());
            }
            return;
        }

        if (future.isCancelled()) {
            cancel(true);
            return;
        }

        CompletablePromiseContext.schedule(this::tryToComplete);
    }
}

例:

public class Main {
    public static void main(String[] args) {
        final ExecutorService service = Executors.newSingleThreadExecutor();
        final Future stringFuture = service.submit(() -> "success");
        final CompletableFuture completableFuture = new CompletablePromise<>(stringFuture);

        completableFuture.whenComplete((result, failure) -> {
            System.out.println(result);
        });
    }
}



5> Valery Silae..:

让我建议另一个(希望,更好)选项:https: //github.com/vsilaev/java-async-await/tree/master/com.farata.lang.async.examples/src/main/java/com/farata /同时

简而言之,这个想法如下:

    介绍CompletableTask接口 - CompletionStage+ 的并集 RunnableFuture

    变形从方法ExecutorService返回(而不是)CompletableTasksubmit(...)Future

    完成后,我们有可运行和可组合的期货.

实现使用替代的CompletionStage实现(注意,CompletionStage而不是CompletableFuture):

用法:

J8ExecutorService exec = J8Executors.newCachedThreadPool();
CompletionStage = exec
   .submit( someCallableA )
   .thenCombineAsync( exec.submit(someCallableB), (a, b) -> a + " " + b)
   .thenCombine( exec.submit(someCallableC), (ab, b) -> ab + " " + c); 


小更新:代码被移动到单独的项目,https://github.com/vsilaev/tascalate-concurrent,现在可以使用来自java.util.concurrent的远程框Executor-s.
推荐阅读
k78283381
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有