Как в java обработать CompletableFutures и получить первый желаемый результат?

Обычно с CompletableFuture я бы вызвал thenApply или какой-нибудь другой метод, чтобы что-то сделать, как только станет доступен результат. Однако теперь у меня есть ситуация, когда я хочу обрабатывать результаты, пока не получу положительный результат, а затем игнорировать все дальнейшие результаты.

Если бы я просто хотел получить первый доступный результат, я мог бы использовать CompletableFuture.anyOf (хотя я ненавижу преобразовывать список в массив только для вызова anyOf). Но я не этого хочу. Я хочу получить первый результат, и если он не дает желаемого результата, я хочу обработать второй доступный результат и так далее, пока не получу желаемый результат.

Вот простой пример, который просматривает все результаты и возвращает первое найденное значение, которое больше 9. (Обратите внимание, что это не моя настоящая задача. Это всего лишь простой пример).

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
    for(CompletableFuture<Integer> result : results) {
        Integer v = result.get();
        if(v > 9)
            return v;
    }
    return null;
}

Конечно, этот пример показывает результаты с самого начала, а не рассматривает результаты по мере их завершения. Итак, вот тот, который выполняет то, что я хочу, но с гораздо более сложным кодом.

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
    AtomicInteger finalResult = new AtomicInteger();
    CountDownLatch latch = new CountDownLatch(results.size());
    for(CompletableFuture<Integer> result : results) {
        result.whenComplete((v,e) -> {
            if(e!=null) {
                Logger.getLogger(getClass()).error("",e);
            } else if(v > 9) {
                finalResult.set(v);
                while(latch.getCount() > 0)
                    latch.countDown();
                return;
            }
            latch.countDown();
        });
    }
    latch.await();

    if(finalResult.get() > 9)
        return finalResult.get();
    return null;
}    

Есть ли API, где я могу это сделать?

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
    Iterator<Integer> resultIt = getResultsAsAvailable(results);
    for(; resultIt.hasNext();) {
        Integer v = resultIt.next();
        if(v > 9)
            return v;
    }
    return null;
}

Или даже лучше:

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
    return getFirstMatch(results, r -> {return r > 9;});
}

person HappyEngineer    schedule 14.04.2016    source источник


Ответы (2)


Вы можете использовать следующее решение:

public static <T> CompletableFuture<T> anyMatch(
    List<? extends CompletionStage<? extends T>> l, Predicate<? super T> criteria) {

    CompletableFuture<T> result=new CompletableFuture<>();
    Consumer<T> whenMatching=v -> { if(criteria.test(v)) result.complete(v); };
    CompletableFuture.allOf(l.stream()
        .map(f -> f.thenAccept(whenMatching)).toArray(CompletableFuture<?>[]::new))
    .whenComplete((ignored, t) ->
        result.completeExceptionally(t!=null? t: new NoSuchElementException()));
    return result;
}

Основной принцип тот же, что и в ответе Pillar, однако есть некоторые отличия:

  • Общая подпись более гибкая.
  • Создание массива, необходимого для CompletableFuture.allOf, совмещено с регистрацией последующего действия для исходных фьючерсов. В качестве побочного эффекта обработчик действия allOf зависит от завершения всех попыток завершить результат, а не только от исходных фьючерсов. Это делает действительно желаемую зависимость явной. Таким образом, это сработает даже, если мы заменим все thenAccept на thenAcceptAsync.
  • Это решение завершается NoSuchElementException, а не возвращает null в случае, если ни один результат не соответствует критериям. Если хотя бы одно будущее завершилось в исключительном порядке и нет успешного завершения с совпадающим результатом, передается одно из возникших исключений.

Вы можете попробовать это с

List<CompletableFuture<Integer>> list=Arrays.asList(
    CompletableFuture.supplyAsync(()->5),
    CompletableFuture.supplyAsync(()->{throw new RuntimeException(); }),
    CompletableFuture.supplyAsync(()->42),
    CompletableFuture.completedFuture(0)
);
anyMatch(list, i -> i>9)
    .thenAccept(i->System.out.println("got "+i))
    // optionally chain with:
    .whenComplete((x,t)->{ if(t!=null) t.printStackTrace(); });
person Holger    schedule 14.04.2016

Я не знаю ни одного такого API в JDK или где-либо еще. Вы можете сами свернуть.

Вы можете воспользоваться тем, что CompletableFuture#completecompleteExceptionally) ничего не делает, если будущее уже завершено.

Если еще не завершено, устанавливает значение, возвращаемое get() и связанными методами, равным заданному значению.

Создайте новый окончательный результат CompletableFuture. Добавьте продолжение к каждому из ваших фьючерсов, которое пытается complete получить этот окончательный результат, если ваше условие применяется. Это будущее завершится первым успехом. Однако, если ничего не получится, в результате вам, по-видимому, понадобится null. Вы можете создать CompletableFuture с allOf, чтобы также попытаться complete окончательный результат с null.

Что-то типа

public static <T> CompletableFuture<T> firstOrNull(List<CompletableFuture<T>> futures, Predicate<T> condition) {
    CompletableFuture<T> finalResult = new CompletableFuture<>();
    // attempt to complete on success
    futures.stream().forEach(future -> future.thenAccept(successResult -> {
        if (condition.test(successResult))
            finalResult.complete(successResult);
    }));
    CompletableFuture<?> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
    all.thenRun(() -> {
        finalResult.complete(null);
    });
    return finalResult;
}

Вы оплачиваете накладные расходы на вызовы без операции.

При необходимости вы можете изменить null на какое-либо значение по умолчанию или по-другому обрабатывать исключения (completeExceptionally при возникновении ошибки). Вам нужно будет использовать whenComplete или handle вместо thenAccept выше, чтобы получить доступ к Exception.

person Savior    schedule 14.04.2016