Список ‹Future› в будущее ‹List› последовательность

Я пытаюсь преобразовать List<CompletableFuture<X>> в CompletableFuture<List<T>>. Это очень полезно, когда у вас много асинхронных задач и вам нужно получить результаты всех из них.

Если кто-то из них потерпит неудачу, окончательное будущее не удастся. Вот как я реализовал:

  public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
        if(com.isEmpty()){
            throw new IllegalArgumentException();
        }
        Stream<? extends CompletableFuture<T>> stream = com.stream();
        CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
        return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
            x.add(y);
            return x;
        },exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
            ls1.addAll(ls2);
            return ls1;
        },exec));
    }

Чтобы запустить его:

ExecutorService executorService = Executors.newCachedThreadPool();
        Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep((long) (Math.random() * 10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return x;
        }, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);

Если какой-либо из них терпит неудачу, значит, он терпит неудачу. Он дает ожидаемый результат, даже если существует миллион фьючерсов. У меня есть проблема: скажем, если существует более 5000 фьючерсов и если какое-либо из них не сработает, я получаю StackOverflowError:

Исключение в потоке «пул-1-поток-2611» java.lang.StackOverflowError в java.util.concurrent.CompletableFuture.internalComplete (CompletableFuture.java:210) в java.util.concurrent.CompletableFuture $ ThenCompose.run (CompletableFuture.java : 1487) на java.util.concurrent.CompletableFuture.postComplete (CompletableFuture.java:193) на java.util.concurrent.CompletableFuture.internalComplete (CompletableFuture.java:210) в java.util.concurrent (.CompletableFutrun $ ThenCompose CompletableFuture.java:1487)

Что я делаю не так?

Примечание. Вышеупомянутое возвращенное будущее не работает, когда не удается выполнить какое-либо из будущих событий. В принятом ответе следует также учитывать этот момент.


person Jatin    schedule 04.05.2015    source источник
comment
На вашем месте я бы вместо этого реализовал Collector ...   -  person fge    schedule 04.05.2015
comment
@fge Это действительно очень хорошее предложение. Я приехал из мира scala, где у нас есть похожая вещь. Коллекционер мог бы здесь больше подойти. Но тогда реализация, я полагаю, может быть похожей.   -  person Jatin    schedule 04.05.2015


Ответы (9)


Используйте 1_:

static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
    return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
            .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
            );
}

Несколько комментариев по поводу вашей реализации:

Использование вами .thenComposeAsync, .thenApplyAsync и .thenCombineAsync, скорее всего, не соответствует вашим ожиданиям. Эти ...Async методы запускают предоставленную им функцию в отдельном потоке. Итак, в вашем случае вы вызываете добавление нового элемента в список для запуска в предоставленном исполнителе. Нет необходимости вставлять легковесные операции в кэширующий исполнитель потока. Не используйте thenXXXXAsync методы без уважительной причины.

Кроме того, reduce не следует использовать для накопления в изменяемых контейнерах. Даже если это может работать правильно, когда поток является последовательным, это не удастся, если поток должен быть параллельным. Чтобы выполнить изменяемое сокращение, используйте вместо этого .collect.

Если вы хотите завершить все вычисления в исключительных случаях сразу после первого сбоя, сделайте следующее в своем sequence методе:

CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
        .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
        );

com.forEach(f -> f.whenComplete((t, ex) -> {
    if (ex != null) {
        result.completeExceptionally(ex);
    }
}));

return result;

Если, кроме того, вы хотите отменить оставшиеся операции при первом сбое, добавьте exec.shutdownNow(); сразу после result.completeExceptionally(ex);. Это, конечно, предполагает, что exec существует только для этого одного вычисления. Если этого не произойдет, вам придется перебирать и отменять каждый оставшийся Future индивидуально.

person Misha    schedule 04.05.2015
comment
Я не понимаю, что тип возвращаемого значения allof - CompletableFuture<Void>, и мы возвращаем CompletableFuture<List<T>> без предупреждения компилятора. Я не осознавал эту природу пустоты - person Jatin; 04.05.2015
comment
В моем случае вполне нормально использовать ArrayList, потому что операция сложения не может выполняться параллельно. Только когда завершится предыдущий, мы добавляем элемент и так далее. Таким образом, сложение никогда не будет выполняться параллельно. - person Jatin; 04.05.2015
comment
См. Часть .thenApply. После успешного завершения allOf(...) он собирает все полученные значения в список. - person Misha; 04.05.2015
comment
Я согласен с большей частью ответа, кроме части ArrayList. его совершенно безопасно использовать в моем коде. В противном случае это лучший код, чем мой (просто вы повторяете список несколько раз. Это нормально, учитывая потерянную многословность). - person Jatin; 04.05.2015
comment
@Jatin Думаю, ты прав в этом. Я подумаю над этим утром, когда проснусь, и соответствующим образом изменю свой ответ. - person Misha; 04.05.2015
comment
@Jatin Вы правы, в рамках текущей реализации reduce, пока поток в методе sequence2 сохраняется последовательным, ArrayList безопасен. Однако очень нежелательно писать конструкции потока, которые ломаются, если поток был сделан параллельным. По крайней мере, если вы полагаетесь на то, что поток является последовательным, третий аргумент reduce должен быть (a, b) -> {throw new IllegalStateException("Parallel not allowed");} - person Misha; 05.05.2015
comment
Хороший момент! Спасибо. Я думаю, что sequence2 было плохой идеей. Коллекционер, безусловно, лучше, и ваш ответ тоже. - person Jatin; 05.05.2015
comment
Здесь есть один недостаток. CompletableFuture.allOf завершается только тогда, когда все завершено. Даже если один из них терпит неудачу, он все равно ждет ответа от других. - person Jatin; 15.05.2015
comment
Именно так будет вести себя ваше исходное решение (с использованием thenCombine). Если вы хотите замкнуть вычисление и немедленно запустить исключительное завершение, это легко сделать. См. Обновленный ответ. - person Misha; 17.05.2015
comment
Для завершения :-P я думаю, что этому коду нужен статический импорт java.util.stream.Collectors.toList - person jneira; 24.11.2015
comment
Это не компилируется: Type mismatch: cannot convert from CompletableFuture<Object> to CompletableFuture<List<T>>. JDK 8u66 - person Dirk Hillbrecht; 10.01.2016
comment
@Misha Я не понимаю преимуществ использования allOf в вашем коде. Отдельные задачи вызываются последовательно с помощью join, поэтому задача i + 1 не вызывается, пока задача i не будет завершена. Я не нашел в документации ничего, что позволяло бы запускать все подзадачи параллельно. Самая близкая вещь, кажется, ForkJoinPool .invokeAll, которая занимает кучу Callable. - person Abhijit Sarkar; 18.01.2016
comment
@AbhijitSarkar Задачи не вызываются join. Преимущество использования allOf заключается в том, что при срабатывании allOf триггера все задачи выполнены, а join просто получает результаты. - person Misha; 19.01.2016
comment
@DirkHillbrecht вы используете eclipse? - person Misha; 19.01.2016
comment
@Misha Не могли бы вы подробнее рассказать, когда вызываются отдельные задачи и как контролируется максимальный параллелизм? Если я отправлю сотни задач с использованием allOf, не удастся вызвать все из них одновременно, поскольку в системе просто не хватит для этого ресурсов, помимо полного отказа от цели объединения потоков. - person Abhijit Sarkar; 20.01.2016
comment
@AbhijitSarkar Это выходит за рамки этого метода. Назначение пулам потоков выполняется при создании отдельных CompletableFuture. Вполне возможно, что все задачи были запланированы в одном потоке, и все это будет выполняться последовательно. Также возможно, что они были запланированы в гигантском пуле потоков, и все будет работать одновременно. В любом случае allOf обеспечит выполнение всех задач до запуска. - person Misha; 20.01.2016
comment
@Misha, да, Eclipse Mars.1 с использованием JDK 8u66. - person Dirk Hillbrecht; 20.01.2016
comment
@Misha Если мы предоставим этой функции список завершаемых фьючерсов, поддерживаемых потоком (созданным с помощью supplyAsync), и одно из фьючерсов терпит неудачу, то что произойдет с остальными? Они брошены как зомби-нити? - person cubuspl42; 22.08.2017
comment
Разве прямого CompletableFuture.supplyAsync(() -> com.parallelStream().map(CompletableFuture::join).collect(toList), exec) не будет достаточно? - person charlie; 13.02.2018
comment
Я знаю, что это немного старая тема. Я попытался выполнить упомянутое решение в своей среде Windows eclipse. Однако каждый раз, когда я выполняю, он завершается при первом возникновении исключения. Это из-за того, что result.completeExceptionally(ex) выполняется и убивает все оставшиеся потоки. Если да, что я могу сделать, чтобы обеспечить выполнение и остальных потоков. - person NIGAGA; 22.08.2019
comment
@NIGAGA Версия кода с result.completeExceptionally специально предназначена для завершения после первого сбоя (см. Описание перед листингом кода). Первая версия с allOf будет ждать завершения всех задач (успешно или нет) перед завершением в будущем. Если это вам не поможет, задайте новый вопрос и укажите свой код. - person Misha; 22.08.2019
comment
@Misha Спасибо, теперь я понимаю. - person NIGAGA; 22.08.2019
comment
Рекомендуется: Массивы мудрости древних. Короче говоря, используйте toArray(new CompletableFuture<?>[0]). Это проще и менее подвержено ошибкам и более эффективно в наиболее распространенной реализации JVM… - person Holger; 29.11.2019

Как указал Миша, вы злоупотребляете …Async операциями. Кроме того, вы составляете сложную цепочку операций, моделирующую зависимость, не отражающую логику вашей программы:

  • вы создаете задание x, которое зависит от первого и второго задания в вашем списке
  • вы создаете задание x + 1, которое зависит от задания x и третьего задания в вашем списке
  • вы создаете задание x + 2, которое зависит от задания x + 1 и 4-го задания в вашем списке
  • вы создаете вакансию x + 5000, которая зависит от вакансии x + 4999 и последней вакансии в вашем списке

Затем отмена (явно или из-за исключения) этого рекурсивно составленного задания может выполняться рекурсивно и может завершиться ошибкой с StackOverflowError. Это зависит от реализации.

Как уже было показано Мишей, существует метод _ 3_, который позволяет моделировать ваш исходное намерение определить одну работу, которая зависит от всех заданий из вашего списка.

Однако стоит отметить, что даже в этом нет необходимости. Поскольку вы используете неограниченный исполнитель пула потоков, вы можете просто опубликовать асинхронное задание, собирающее результаты в список, и все готово. Ожидание завершения подразумевается, в любом случае запрашивая результат каждого задания.

ExecutorService executorService = Executors.newCachedThreadPool();
List<CompletableFuture<Integer>> que = IntStream.range(0, 100000)
  .mapToObj(x -> CompletableFuture.supplyAsync(() -> {
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos((long)(Math.random()*10)));
    return x;
}, executorService)).collect(Collectors.toList());
CompletableFuture<List<Integer>> sequence = CompletableFuture.supplyAsync(
    () -> que.stream().map(CompletableFuture::join).collect(Collectors.toList()),
    executorService);

Использование методов для составления зависимых операций важно, когда количество потоков ограничено и задания могут порождать дополнительные асинхронные задания, чтобы избежать того, чтобы ожидающие задания крадут потоки из заданий, которые должны завершиться первыми, но это не тот случай.

В этом конкретном случае одно задание, просто повторяющее это большое количество предварительных заданий и ожидание, если необходимо, может быть более эффективным, чем моделирование этого большого количества зависимостей и получение каждым заданием уведомления зависимого задания о завершении.

person Holger    schedule 04.05.2015
comment
Одно предостережение заключается в том, что использование supplyAsync вместо allOf будет потреблять поток из пула, чтобы ожидать завершения всех задач. Если я не ошибаюсь, allOf будет работать в потоках, назначенных для соответствующих задач. Не имеет большого значения для большинства случаев использования, но стоит отметить. - person Misha; 05.05.2015
comment
@Misha: Я действительно упомянул, что он украдет поток, если количество потоков ограничено, и что он работает здесь, потому что используется неограниченный исполнитель пула потоков (и не создаются асинхронные подзадачи). - person Holger; 05.05.2015
comment
@Holger Проблема с этим ответом заключается в том, что: если какой-либо из последующих вариантов будущего терпит неудачу, он по-прежнему ждет, пока к нему присоединятся, чтобы завершить. Скорее, как только что-то терпит неудачу, возвращенное будущее должно быть немедленно провалено. - person Jatin; 15.05.2015
comment
Собственно, меня даже это устраивает. но не воровство нити. - person Jatin; 15.05.2015

Вы можете получить библиотеку Spotify CompletableFutures и использовать _ 2_ метод. Думаю, он вдохновлен _ 3_.

public static <T> CompletableFuture<List<T>> allAsList(
    List<? extends CompletionStage<? extends T>> stages) {

А вот простая реализация, если вы не хотите использовать библиотеку:

public <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) {
    return CompletableFuture.allOf(
        futures.toArray(new CompletableFuture[futures.size()])
    ).thenApply(ignored ->
        futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
    );
}
person oskansavli    schedule 28.05.2016

Пример операции последовательности с использованием thenCombine на CompletableFuture

public<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com){

    CompletableFuture<List<T>> identity = CompletableFuture.completedFuture(new ArrayList<T>());

    BiFunction<CompletableFuture<List<T>>,CompletableFuture<T>,CompletableFuture<List<T>>> combineToList = 
            (acc,next) -> acc.thenCombine(next,(a,b) -> { a.add(b); return a;});

    BinaryOperator<CompletableFuture<List<T>>> combineLists = (a,b)-> a.thenCombine(b,(l1,l2)-> { l1.addAll(l2); return l1;}) ;  

    return com.stream()
              .reduce(identity,
                      combineToList,
                      combineLists);  

   }
} 

Если вы не против использования сторонних библиотек, cyclops-react (я автор) имеет набор служебных методов для CompletableFutures (и Optionals, Streams и т. д.)

  List<CompletableFuture<String>> listOfFutures;

  CompletableFuture<ListX<String>> sequence =CompletableFutures.sequence(listOfFutures);
person John McClean    schedule 19.09.2015

Чтобы добавить к принятому ответу @Misha, его можно расширить как сборщик:

 public static <T> Collector<CompletableFuture<T>, ?, CompletableFuture<List<T>>> sequenceCollector() {
    return Collectors.collectingAndThen(Collectors.toList(), com -> sequence(com));
}

Теперь вы можете:

Stream<CompletableFuture<Integer>> stream = Stream.of(
    CompletableFuture.completedFuture(1),
    CompletableFuture.completedFuture(2),
    CompletableFuture.completedFuture(3)
);
CompletableFuture<List<Integer>> ans = stream.collect(sequenceCollector());
person Jatin    schedule 14.06.2017

Заявление об ограничении ответственности: это не дает полного ответа на первоначальный вопрос. В нем не будет части «все выйдет из строя, если один выйдет из строя». Однако я не могу ответить на настоящий, более общий вопрос, потому что он был закрыт как дубликат этого: Java 8 CompletableFuture.allOf (...) с коллекцией или списком. Так что отвечу здесь:

Как преобразовать List<CompletableFuture<V>> в CompletableFuture<List<V>> с помощью потокового API Java 8?

Резюме: используйте следующее:

private <V> CompletableFuture<List<V>> sequence(List<CompletableFuture<V>> listOfFutures) {
    CompletableFuture<List<V>> identity = CompletableFuture.completedFuture(new ArrayList<>());

    BiFunction<CompletableFuture<List<V>>, CompletableFuture<V>, CompletableFuture<List<V>>> accumulator = (futureList, futureValue) ->
        futureValue.thenCombine(futureList, (value, list) -> {
                List<V> newList = new ArrayList<>(list.size() + 1);
                newList.addAll(list);
                newList.add(value);
                return newList;
            });

    BinaryOperator<CompletableFuture<List<V>>> combiner = (futureList1, futureList2) -> futureList1.thenCombine(futureList2, (list1, list2) -> {
        List<V> newList = new ArrayList<>(list1.size() + list2.size());
        newList.addAll(list1);
        newList.addAll(list2);
        return newList;
    });

    return listOfFutures.stream().reduce(identity, accumulator, combiner);
}

Пример использования:

List<CompletableFuture<String>> listOfFutures = IntStream.range(0, numThreads)
    .mapToObj(i -> loadData(i, executor)).collect(toList());

CompletableFuture<List<String>> futureList = sequence(listOfFutures);

Полный пример:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.stream.IntStream;

import static java.util.stream.Collectors.toList;

public class ListOfFuturesToFutureOfList {

    public static void main(String[] args) {
        ListOfFuturesToFutureOfList test = new ListOfFuturesToFutureOfList();
        test.load(10);
    }

    public void load(int numThreads) {
        final ExecutorService executor = Executors.newFixedThreadPool(numThreads);

        List<CompletableFuture<String>> listOfFutures = IntStream.range(0, numThreads)
            .mapToObj(i -> loadData(i, executor)).collect(toList());

        CompletableFuture<List<String>> futureList = sequence(listOfFutures);

        System.out.println("Future complete before blocking? " + futureList.isDone());

        // this will block until all futures are completed
        List<String> data = futureList.join();
        System.out.println("Loaded data: " + data);

        System.out.println("Future complete after blocking? " + futureList.isDone());

        executor.shutdown();
    }

    public CompletableFuture<String> loadData(int dataPoint, Executor executor) {
        return CompletableFuture.supplyAsync(() -> {
            ThreadLocalRandom rnd = ThreadLocalRandom.current();

            System.out.println("Starting to load test data " + dataPoint);

            try {
                Thread.sleep(500 + rnd.nextInt(1500));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("Successfully loaded test data " + dataPoint);

            return "data " + dataPoint;
        }, executor);
    }

    private <V> CompletableFuture<List<V>> sequence(List<CompletableFuture<V>> listOfFutures) {
        CompletableFuture<List<V>> identity = CompletableFuture.completedFuture(new ArrayList<>());

        BiFunction<CompletableFuture<List<V>>, CompletableFuture<V>, CompletableFuture<List<V>>> accumulator = (futureList, futureValue) ->
            futureValue.thenCombine(futureList, (value, list) -> {
                    List<V> newList = new ArrayList<>(list.size() + 1);
                    newList.addAll(list);
                    newList.add(value);
                    return newList;
                });

        BinaryOperator<CompletableFuture<List<V>>> combiner = (futureList1, futureList2) -> futureList1.thenCombine(futureList2, (list1, list2) -> {
            List<V> newList = new ArrayList<>(list1.size() + list2.size());
            newList.addAll(list1);
            newList.addAll(list2);
            return newList;
        });

        return listOfFutures.stream().reduce(identity, accumulator, combiner);
    }

}
person Kai Stapel    schedule 17.05.2018
comment
Вы должны использовать thenCombine() вместо thenApply() в аккумуляторе, чтобы избежать вызова join(). В противном случае вызывающий поток фактически выполнит это, поэтому коллекция вернется только после того, как все будет завершено. Вы можете проверить это, добавив печать перед futureList.join(): она печатается только после того, как все фьючерсы напечатают «Успешно загруженные тестовые данные». - person Didier L; 18.05.2018
comment
@DidierL Если я изменю thenApply() на thenCombine(), то последний join() вызов CompletableFuture<List<V>> больше не будет блокироваться, а немедленно вернется с пустым результатом. Таким образом, будущее списка не будет ждать, пока будут завершены все индивидуальные фьючерсы. Но это была первоначальная идея всего этого. - person Kai Stapel; 18.05.2018
comment
Да, действительно, я забыл, что Collector зависит от мутации. Проблема с вашим кодом в том, что он эквивалентен CompletableFuture.completedFuture(listOfFutures.stream().map(CompletableFuture::join).collect(toList()));. Коллекция фактически возвращает будущее, которое уже завершено, поэтому нет смысла возвращать future. - person Didier L; 18.05.2018
comment
Возможно, вы правы, что это функционально эквивалентно моему полному примеру. Однако этот пример предназначен только для иллюстрации того, как использовать сборщик toFutureList(). Не эквивалентны listOfFutures.stream().map(CompletableFuture::join).collect(toList()) и listOfFutures.stream().collect(toFutureList()). Первый дает вам полный результат со всеми завершенными фьючерсами, а второй дает вам будущее из списка значений, которые вы можете передать или сопоставить с другими значениями без блокировки. - person Kai Stapel; 18.05.2018
comment
Вот в чем вы ошибаетесь: последний делает то же самое. Ваш коллектор просто вызывает join() для всех фьючерсов в вызывающем потоке и помещает результат в уже завершенный CompletableFuture. Это блокировка. Как я сказал ранее, просто добавьте печать сразу после сбора потока, и вы увидите, что эта печать произойдет только после того, как все фьючерсы будут завершены. - person Didier L; 18.05.2018
comment
Да, ты прав. Я соответствующим образом обновил свой ответ. Спасибо что подметил это. - person Kai Stapel; 23.05.2018

В дополнение к библиотеке Spotify Futures вы можете попробовать мой код, расположенный здесь: https://github.com/vsilaev/java-async-await/blob/master/net.tascalate.async.examples/src/main/java/net/tascalate/concurrent/CompletionStages.java (имеет зависимости от других классов в том же пакете)

Он реализует логику для возврата «по крайней мере N из M» CompletionStage-s с политикой допустимого количества ошибок. Существуют удобные методы для всех / любых случаев, плюс политика отмены для оставшихся фьючерсов, плюс код имеет дело с CompletionStage-s (интерфейс), а не с CompletableFuture (конкретный класс).

person Valery Silaev    schedule 10.09.2016
comment
Обновление: предложенный код перемещен в отдельную библиотеку, github.com/vsilaev/tascalate-concurrent - person Valery Silaev; 25.07.2017
comment
Этот ответ бесполезен, если ссылка перестает работать. Пожалуйста, вставьте код в ответ. - person Simon Forsberg; 02.07.2019

Javaslang имеет очень удобный _ 1_ API. Это также позволяет сделать будущее коллекции из коллекции фьючерсов.

List<Future<String>> listOfFutures = ... 
Future<Seq<String>> futureOfList = Future.sequence(listOfFutures);

См. http://static.javadoc.io/io.javaslang/javaslang/2.0.5/javaslang/concurrent/Future.html#sequence-java.lang.Iterable-

person Mathias Dpunkt    schedule 22.11.2016
comment
Мне нравится ответ. Но полагается на javaslang.concurrent.Future :( - person Jatin; 22.11.2016
comment
Это правда, но, поработав с javaslang Future, вы не хотите возвращаться к java Future или CompletableFuture. - person Mathias Dpunkt; 22.11.2016

Ваша задача может быть легко выполнена, например,

final List<CompletableFuture<Module> futures =...
CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new)).join();
person Janitha Madushan    schedule 26.03.2020