Обработка исключений поставщика Java 8 с помощью CompletableFuture

Рассмотрим следующий код

public class TestCompletableFuture {

    BiConsumer<Integer, Throwable> biConsumer = (x,y) -> {
        System.out.println(x);
        System.out.println(y);
    };

    public static void main(String args[]) {
        TestCompletableFuture testF = new TestCompletableFuture();
        testF.start();      
    }

    public void start() {
        Supplier<Integer> numberSupplier = new Supplier<Integer>() {
            @Override
            public Integer get() {
                return SupplyNumbers.sendNumbers();                     
            }
        };
        CompletableFuture<Integer> testFuture = CompletableFuture.supplyAsync(numberSupplier).whenComplete(biConsumer);         
    }       
}

class SupplyNumbers {
    public static Integer sendNumbers(){
        return 25; // just for working sake its not  correct.
    }
}

Вышеупомянутая вещь работает нормально. Однако в моем случае sendNumbers мог также вызвать проверенное исключение, например:

class SupplyNumbers {
    public static Integer sendNumbers() throws Exception {
        return 25; // just for working sake its not  correct.
    }
}

Теперь я хочу обработать это исключение как y в моем biConsumer. Это поможет мне обработать результат, а также исключение (если есть) внутри одной функции (biConsumer).

Любые идеи? Могу я использовать здесь CompletableFuture.exceptionally(fn) или что-нибудь еще?


person ayush    schedule 10.03.2015    source источник
comment
Вот решение, которое позволяет использовать проверенные исключения без снижения читабельности вашего кода: stackoverflow.com/a/49705336/14731   -  person Gili    schedule 07.04.2018


Ответы (5)


Заводские методы, использующие стандартные функциональные интерфейсы, бесполезны, когда вы хотите обрабатывать отмеченные исключения. Когда вы вставляете код, перехватывающий исключение, в лямбда-выражение, возникает проблема, заключающаяся в том, что предложению catch требуется экземпляр CompletableFuture для установки исключения, в то время как фабричному методу требуется Supplier, курица-и-яйцо.

Вы можете использовать поле экземпляра класса, чтобы разрешить мутацию после создания, но в конечном итоге полученный код не будет чистым и сложным, чем прямое решение на основе Executor. В документации CompletableFuture говорится:

  • Все методы async без явного аргумента Executor выполняются с использованием _ 5_

Итак, вы знаете, что следующий код покажет стандартное поведение CompletableFuture.supplyAsync(Supplier) при прямой обработке проверенных исключений:

CompletableFuture<Integer> f=new CompletableFuture<>();
ForkJoinPool.commonPool().submit(()-> {
  try { f.complete(SupplyNumbers.sendNumbers()); }
  catch(Exception ex) { f.completeExceptionally(ex); }
});

В документации также говорится:

… Чтобы упростить мониторинг, отладку и отслеживание, все сгенерированные асинхронные задачи являются экземплярами интерфейса маркера _ 8_.

Если вы хотите придерживаться этого соглашения, чтобы решение еще больше походило на исходный метод supplyAsync, измените код на:

CompletableFuture<Integer> f=new CompletableFuture<>();
ForkJoinPool.commonPool().submit(
  (Runnable&CompletableFuture.AsynchronousCompletionTask)()-> {
    try { f.complete(SupplyNumbers.sendNumbers()); }
    catch(Exception ex) { f.completeExceptionally(ex); }
});
person Holger    schedule 10.03.2015
comment
не могли бы вы объяснить, что это значит? (Runnable и CompletableFuture.AsynchronousCompletionTask) - person Jasonw; 04.11.2015
comment
@Jasonw: это приведение к типу пересечения. Другими словами, объект должен реализовывать оба типа, Runnable и CompletableFuture.AsynchronousCompletionTask. Поскольку приведение типов обеспечивает тип контекста для лямбда-выражений, это означает, что сгенерированный экземпляр лямбда будет реализовывать оба интерфейса. См. Также здесь - person Holger; 04.11.2015

Вы уже перехватываете исключение в y. Может быть, вы не видели его, потому что main вышел из игры до того, как завершилась ваша CompletableFuture?

Приведенный ниже код выводит «null» и «Hello», как и ожидалось:

public static void main(String args[]) throws InterruptedException {
  TestCompletableFuture testF = new TestCompletableFuture();
  testF.start();
  Thread.sleep(1000); //wait for the CompletableFuture to complete
}

public static class TestCompletableFuture {
  BiConsumer<Integer, Throwable> biConsumer = (x, y) -> {
    System.out.println(x);
    System.out.println(y);
  };
  public void start() {
    CompletableFuture.supplyAsync(SupplyNumbers::sendNumbers)
            .whenComplete(biConsumer);
  }
}

static class SupplyNumbers {
  public static Integer sendNumbers() {
    throw new RuntimeException("Hello");
  }
}
person assylias    schedule 10.03.2015

Я не совсем понимаю, чего вы пытаетесь достичь. Если ваш поставщик генерирует исключение, когда вы вызываете testFuture .get(), вы получите java.util.concurrent.ExecutionException, вызванное любым исключением, сгенерированным поставщиком, которое вы можете получить, вызвав getCause() на ExecutionException.

Или, как вы упомянули, вы можете использовать exceptionally в CompletableFuture. Этот код:

public class TestCompletableFuture {

    private static BiConsumer<Integer, Throwable> biConsumer = (x,y) -> {
        System.out.println(x);
        System.out.println(y);
    };

    public static void main(String args[]) throws Exception {
        Supplier<Integer> numberSupplier = () -> {
            throw new RuntimeException(); // or return integer
        };

        CompletableFuture<Integer> testFuture = CompletableFuture.supplyAsync(numberSupplier)
                .whenComplete(biConsumer)
                .exceptionally(exception -> 7);

        System.out.println("result = " + testFuture.get());
    }

}

Распечатывает этот результат:

null
java.util.concurrent.CompletionException: java.lang.RuntimeException
result = 7

РЕДАКТИРОВАТЬ:

Если вы отметили исключения, вы можете просто добавить try-catch.

Исходный код:

Supplier<Integer> numberSupplier = new Supplier<Integer>() {
    @Override
    public Integer get() {
        return SupplyNumbers.sendNumbers();                     
    }
};

Измененный код:

Supplier<Integer> numberSupplier = new Supplier<Integer>() {
    @Override
    public Integer get() {
        try {
            return SupplyNumbers.sendNumbers();                     
        } catch (Excetpion e) {
            throw new RuntimeExcetpion(e);
        }
    }
};
person Jaroslaw Pawlak    schedule 10.03.2015
comment
OP надеялся, что сможет обработать исключение в коде, который вызывает поставщика, а не внутри самого поставщика. Если вы всегда собираетесь обрабатывать исключение согласованным образом, будет неуклюже заставлять пытаться / уловить каждого поставщика, которого вы пишете. - person skelly; 07.06.2019

Возможно, вы могли бы использовать новый объект, чтобы обернуть свое целое число и ошибку следующим образом:

public class Result {

    private Integer   integer;
    private Exception exception;

    // getter setter

}

А потом:

public void start(){
    Supplier<Result> numberSupplier = new Supplier<Result>() {
        @Override
        public Result get() {
            Result r = new Result();
            try {
                r.setInteger(SupplyNumbers.sendNumbers());
            } catch (Exception e){
                r.setException(e);
            }
            return r;

        }
    };
    CompletableFuture<Result> testFuture = CompletableFuture.supplyAsync(numberSupplier).whenComplete(biConsumer);
}
person dieter    schedule 10.03.2015

Просто оберните проверенное исключение в CompletionException

Еще один момент, который следует учитывать при обработке исключений в CompletableFuture при использовании completeExceptionally(), заключается в том, что точное исключение будет доступно в handle() и whenComplete(), но оно будет заключено в CompletionException при вызове join() или когда оно будет перенаправлено на любой последующий этап.

Таким образом, handle() или exceptionally(), примененные к нижестоящему этапу, увидят CompletionException вместо исходного, и ему придется искать его причину, чтобы найти исходное исключение.

Более того, любой RuntimeException, созданный любой операцией (включая supplyAsync()), также заключен в CompletionException, за исключением случаев, когда это уже CompletionException.

Учитывая это, лучше перестраховаться и разрешить обработчикам исключений разворачивать CompletionExceptions.

Если вы это сделаете, больше нет смысла устанавливать точное (проверенное) исключение для CompletableFuture и гораздо проще обернуть проверенные исключения в CompletionException напрямую:

Supplier<Integer> numberSupplier = () -> {
    try {
        return SupplyNumbers.sendNumbers();
    } catch (Exception e) {
        throw new CompletionException(e);
    }
};

Чтобы сравнить этот подход с подходом Хольгера, я адаптировал ваш код с двумя решениями (simpleWrap() - это выше, customWrap() - это код Хольгера. ):

public class TestCompletableFuture {

    public static void main(String args[]) {
        TestCompletableFuture testF = new TestCompletableFuture();
        System.out.println("Simple wrap");
        testF.handle(testF.simpleWrap());
        System.out.println("Custom wrap");
        testF.handle(testF.customWrap());
    }

    private void handle(CompletableFuture<Integer> future) {
        future.whenComplete((x1, y) -> {
            System.out.println("Before thenApply(): " + y);
        });
        future.thenApply(x -> x).whenComplete((x1, y) -> {
            System.out.println("After thenApply(): " + y);
        });
        try {
            future.join();
        } catch (Exception e) {
            System.out.println("Join threw " + e);
        }
        try {
            future.get();
        } catch (Exception e) {
            System.out.println("Get threw " + e);
        }
    }

    public CompletableFuture<Integer> simpleWrap() {
        Supplier<Integer> numberSupplier = () -> {
            try {
                return SupplyNumbers.sendNumbers();
            } catch (Exception e) {
                throw new CompletionException(e);
            }
        };
        return CompletableFuture.supplyAsync(numberSupplier);
    }

    public CompletableFuture<Integer> customWrap() {
        CompletableFuture<Integer> f = new CompletableFuture<>();
        ForkJoinPool.commonPool().submit(
                (Runnable & CompletableFuture.AsynchronousCompletionTask) () -> {
                    try {
                        f.complete(SupplyNumbers.sendNumbers());
                    } catch (Exception ex) {
                        f.completeExceptionally(ex);
                    }
                });
        return f;
    }
}

class SupplyNumbers {
    public static Integer sendNumbers() throws Exception {
        throw new Exception("test"); // just for working sake its not  correct.
    }
}

Вывод:

Simple wrap
After thenApply(): java.util.concurrent.CompletionException: java.lang.Exception: test
Before thenApply(): java.util.concurrent.CompletionException: java.lang.Exception: test
Join threw java.util.concurrent.CompletionException: java.lang.Exception: test
Get threw java.util.concurrent.ExecutionException: java.lang.Exception: test
Custom wrap
After thenApply(): java.util.concurrent.CompletionException: java.lang.Exception: test
Before thenApply(): java.lang.Exception: test
Join threw java.util.concurrent.CompletionException: java.lang.Exception: test
Get threw java.util.concurrent.ExecutionException: java.lang.Exception: test

Как вы заметите, единственная разница в том, что whenComplete() видит исходное исключение перед thenApply() в случае customWrap(). После thenApply() и во всех остальных случаях исходное исключение переносится.

Самое удивительное, что get() развернет CompletionException в случае Simple wrap и заменит его на ExecutionException.

person Didier L    schedule 09.08.2018
comment
Ключевым моментом здесь является бросание CompletionException обертки, которая заставляет все приближаться к Future<?> духу ... - person Matthieu; 20.01.2021