Я пытаюсь преобразовать List<CompletableFuture<X>>
в CompletableFuture<List<T>>
. Это очень полезно, когда у вас много асинхронных задач и вам нужно получить результаты всех из них.
Если кто-то из них потерпит неудачу, окончательное будущее не удастся. Вот как я реализовал:
public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
if(com.isEmpty()){
throw new IllegalArgumentException();
}
Stream<? extends CompletableFuture<T>> stream = com.stream();
CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
x.add(y);
return x;
},exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
ls1.addAll(ls2);
return ls1;
},exec));
}
Чтобы запустить его:
ExecutorService executorService = Executors.newCachedThreadPool();
Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
return x;
}, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);
Если какой-либо из них терпит неудачу, значит, он терпит неудачу. Он дает ожидаемый результат, даже если существует миллион фьючерсов. У меня есть проблема: скажем, если существует более 5000 фьючерсов и если какое-либо из них не сработает, я получаю StackOverflowError
:
Исключение в потоке «пул-1-поток-2611» java.lang.StackOverflowError в java.util.concurrent.CompletableFuture.internalComplete (CompletableFuture.java:210) в java.util.concurrent.CompletableFuture $ ThenCompose.run (CompletableFuture.java : 1487) на java.util.concurrent.CompletableFuture.postComplete (CompletableFuture.java:193) на java.util.concurrent.CompletableFuture.internalComplete (CompletableFuture.java:210) в java.util.concurrent (.CompletableFutrun $ ThenCompose CompletableFuture.java:1487)
Что я делаю не так?
Примечание. Вышеупомянутое возвращенное будущее не работает, когда не удается выполнить какое-либо из будущих событий. В принятом ответе следует также учитывать этот момент.
Collector
... - person fge   schedule 04.05.2015