Java 8 поддерживает порядок потоков с помощью CompletableFuture::join

У меня есть входной поток запросов, которые выполняются асинхронно. Я хочу убедиться, что когда я использую Completablefuture::join, результат этих требований собирается в порядке входного потока запроса.

Вот как выглядит мой код:

queries.stream()
     .map(query -> CompletableFuture.supplyAsync(() -> {
                    try {
                        return SQLQueryEngine.execute(query);
                    } catch (InternalErrorException e) {
                        throw new RuntimeException(e);
                    }
     }))
     .map(CompletableFuture::join)
     .collect(Collectors.toList());

SQLQueryEngine.execute(запрос); возвращает List<Results>, поэтому вывод равен List<List<Result>. Я хочу сгладить и объединить все результаты в один список. Если я использую .flatMap(List::stream) перед выравниванием коллекции, сохранит ли она порядок?


person premprakash    schedule 27.10.2015    source источник
comment
Миша уже дал ответ. В качестве примечания имейте в виду, что вы выполняете запросы последовательно, ожидая ответа каждого CompletableFuture один за другим (присоединяйтесь к карте). Если вы хотите обрабатывать их одновременно, сопоставьте поток с List из CompletableFutures и вызовите для них allOf. Пример здесь   -  person Ruben    schedule 27.10.2015
comment
Спасибо @Ruben за указание на это :)   -  person premprakash    schedule 27.10.2015


Ответы (1)


Вероятно, вы имели в виду .flatMap, и да, порядок сохранится.

Подумайте о том, чтобы явно передать Executor в supplyAsync, чтобы избежать планирования SQL-запросов, связанных с вводом-выводом, в ForkJoinPool.commonPool().

Как отметил @Ruben , вы присоединяетесь к каждому задача в текущем потоке сразу после ее отправки и перед отправкой следующего запроса, что, вероятно, является ошибкой. Вы должны сначала отправить все запросы и только потом начать присоединяться.

Вы можете сделать это так (со статическим импортом toList):

queries.stream()
    .map(query -> CompletableFuture.supplyAsync(...))
    .collect(toList())
    .stream()
    .map(CompletableFuture::join)
    .collect(toList());
person Misha    schedule 27.10.2015
comment
Код в его нынешнем виде не будет сильно загрязнять общий пул, так как он join проверяет каждое будущее сразу после его отправки. - person Holger; 27.10.2015
comment
Верно, если предположить, что это единственный поток и ничего больше не происходит, что могло бы планировать задачи в общий пул. Даже если для конкретного приложения нет ничего плохого в том, чтобы запускать блокирующие задачи в общем пуле, это просто вредная привычка. - person Misha; 27.10.2015
comment
Верно, тем более, что немедленное присоединение к каждой задаче не кажется преднамеренным. Так что с этой точки зрения мой комментарий был скорее сарказмом… - person Holger; 27.10.2015
comment
Видимо, мой детектор сарказма отключился из-за недостаточного потребления кофеина. Я думаю, что лучше включить комментарий @Ruben в ответ. - person Misha; 27.10.2015
comment
Спасибо @миша. Я не понимаю, что выполнял запросы последовательно. И да, я имел в виду flatMap. Это очень познавательно. Я считаю, что ваше решение сделать .collect().stream() до .map(Completable::join) имеет тот же эффект, что и использование CompletableFuture::allOf. Также еще один вопрос о вашем предложении передать Executor в SupplyAsyc, чтобы избежать загрязнения общего пула, есть ли рекомендуемый способ, как это сделать? stackoverflow.com/ questions/21163108/, предлагает создать новый пул ветвлений и передать его для обеспечения асинхронности - person premprakash; 27.10.2015
comment
Да, он будет иметь тот же эффект, что и allOf, но он короче и достаточно хорош, потому что вы присоединяетесь прямо к текущему потоку. allOf позволит вам ставить в очередь дальнейшие вычисления без объединения, но вам это не нужно. Что касается того, какой Executor передать supplyAsync, то java.util.concurrent.Executors имеет фабричные методы для создания различных Исполнителей. Если у вас есть вопросы о выборе наиболее подходящего Executor, задайте новый вопрос. - person Misha; 27.10.2015
comment
Спасибо @misha, я разместил здесь новый вопрос stackoverflow.com/questions/33377177/ - person premprakash; 27.10.2015
comment
@Misha .. разве это не блокирует одну за другой .. Таким образом, мы получим сумму всех задержек, а не Max (Latencies) - person so-random-dude; 08.04.2020