Как обрабатывать ошибку при выполнении Flux.map ()

Я пытаюсь понять, как обрабатывать ошибки при отображении элементов внутри Flux.

Например, я разбираю строку CSV в одном из своих бизнес-объектов POJO:

myflux.map(stock -> converter.convertHistoricalCSVToStockQuotation(stock));

Некоторые из этих строк могут содержать ошибки, поэтому в журнале я получаю следующее:

 reactor.core.publisher.FluxLog:  onNext([SOME_BOGUS_QUOTE]@38.09 (Fri Apr 08 00:00:00 CEST 2016) H(38.419998)/L(37.849998)/O(37.970001))
 reactor.core.publisher.FluxLog:  onNext([SOME_BOGUS_QUOTE]@38.130001 (Thu Apr 07 00:00:00 CEST 2016) H(38.189999)/L(37.610001)/O(37.799999))
 reactor.core.publisher.FluxLog:  onError(java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo)
 reactor.core.publisher.FluxLog:  java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo

Я читал в API некоторые методы обработки ошибок, но большинство из них касалось возврата «значения ошибки» или использования резервного Flux, например этого:

Flux.onErrorResumeWith(myflux, x -> Mono.fromCallable(() -> ... do stuff);

Однако использование этого с моим myflux означает, что весь поток обрабатывается снова.

Итак, есть ли способ обрабатывать ошибки при обработке определенных элементов (т.е. игнорировать их / регистрировать их) и продолжать обработку остальной части потока?

ОБНОВЛЕНИЕ с помощью обходного пути @akarnokd

public Flux<StockQuotation> getQuotes(List<String> tickers)
{
    Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers)
    // Get each set of quotes in a separate thread
    .flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s)))
    // Convert each list of raw quotes string in a new Flux<String>
    .flatMap(list -> Flux.fromIterable(list))
    // Convert the string to POJOs
    .flatMap(x -> {
            try {
                return Flux.just(converter.convertHistoricalCSVToStockQuotation(x));    
            }
            catch (IllegalArgumentException ex){
                System.out.println("Error decoding stock quotation: " + x);
                return Flux.empty();
            }
    });

    return processingFlux;
}

Однако это работает как прелесть, поскольку, как видите, код стал менее элегантным, чем раньше. Разве у Flux API нет способа делать то, что делает этот код?

retry(...)
retryWhen(...)
onErrorResumeWith(...)
onErrorReturn(...)

person Victor    schedule 26.03.2016    source источник
comment
Вы можете использовать настраиваемое исключение, которое может включать в себя отказавший элемент в качестве переменной. Затем в методе onError вы можете получить отказавший элемент с помощью метода получения в настраиваемом исключении.   -  person Albin    schedule 24.03.2021


Ответы (5)


Вместо этого вам понадобится flatMap, что позволит вам вернуть пустую последовательность, если обработка не удалась:

myflux.flatMap(v -> {
    try {
        return Flux.just(converter.convertHistoricalCSVToStockQuotation(stock));
    } catch (IllegalArgumentException ex) {
        return Flux.empty();
    }
});
person akarnokd    schedule 30.03.2016
comment
Отлично работает (приму этот ответ), но я хотел бы знать, можно ли это сделать с помощью API. Если нет, я открою запрос функции. Спасибо! - person Victor; 30.03.2016
comment
Это стандартный API де-факто для выполнения такого поведения. Ошибки - это терминальные события, и вы должны преобразовать их во что-то еще в лямбдах, чтобы избежать завершения. - person akarnokd; 30.03.2016
comment
Ok. Я предложил создать новый метод обработки индивидуальных отказов (возможно, опубликовать эти отказы как поток мертвых писем?). Может быть, это может быть полезно ... - person Victor; 30.03.2016

Если вы хотите использовать методы Reactor 3 для обработки исключений, вы можете использовать Mono.fromCallable.

flatMap(x -> 
    Mono.fromCallable(() -> converter.convertHistoricalCSVToStockQuotation(x))
        .flux()
        .flatMap(Flux::fromIterable)
        .onErrorResume(Flux::empty)
)

К сожалению, Flux.fromCallable нет, поэтому, предполагая, что вызываемый объект возвращает список, вам придется вручную преобразовать его в Flux.

person Tsvetan Ovedenski    schedule 26.05.2018

В текущей версии Reactor 3 было добавлено довольно много методов. Итак, мы могли бы сделать что-то вроде этого:

Flux.onErrorResume(error -> { 
        System.out.println("Error decoding stock quotation: " + e);
        return Flux.empty();
    });

Дополнительную информацию о том, как обрабатывать ошибки, см. здесь

person code4kix    schedule 22.05.2018

Вы можете использовать onErrorContinue. Это позволяет исправить ошибки, отбросив элемент неисправности и продолжив работу с последующими элементами.

person seanzxx    schedule 08.01.2019

person    schedule
comment
Использование just означает, что вычисления не будут выполняться внутри Mono / Flux, поэтому он не будет обрабатывать ошибку. - person Tsvetan Ovedenski; 30.06.2020