Spring Cloud Stream rabbitmq binder - обработка ошибок функции Spring Cloud

Я использую связыватель кролика весеннего облачного потока с функцией весеннего облака и определяю таких слушателей, как:

public Function<Flux<SomeObject>, Flux<OtherObject>> foo() {
//some code
}

Я также перенаправляю неудавшиеся сообщения в DLQ. Проблема в том, что происходит фатальная ошибка типа org.springframework.messaging.converter.MessageConversionException. Он не обрабатывается ConditionalRejectingErrorHandler, как указано в https://docs.spring.io/spring-amqp/reference/html/#exception-handling, и продолжает циклически повторяться.

Есть ли способ заставить это работать с ConditionalRejectingErrorHandler?

Сейчас я исправляю проблему, используя @ServiceActivator(inputChannel = "errorChannel") и сам исправляю ошибки.

Зависимости:

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
        <dependency>
            <groupId>org.springframework.boot.experimental</groupId>
            <artifactId>spring-boot-starter-data-r2dbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-consul-config</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-hateoas</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-web</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
</dependencies>

person kluzamic    schedule 27.02.2020    source источник


Ответы (1)


Мы долго обсуждали обработку ошибок и другие функции, которые мы используем для императивных функций, и то, как они применяются (или даже могут быть применены) к реактивным функциям, и пробовали несколько разных вещей, но, к сожалению, все сводится к несоответствию импеданса.

Подходы, которые вы описываете, основаны на работе с одним Сообщением. Это единица работы в обработчиках сообщений императивного стиля, таких как Function<String, String>. Вы используете реактивный стиль и тем самым изменили единицу работы с одного сообщения в потоке на весь поток.

Суммируя:

- Function<?, ?> - unit of work is Message
- Function<Flux<?>, Flux<?>> - unit of work is the entire stream

Вы также можете легко это заметить, поскольку реактивная функция вызывается только один раз на протяжении всего срока службы приложения, в то время как императивная функция вызывается один раз для каждого поступающего сообщения. Причина, по которой я говорю это, заключается в том, что основанные на фреймворке подходы, которые мы используем для императивных обработчиков сообщений (функций), не могут применяться к реактивным без возникновения побочных эффектов. И обычно реактивные разработчики понимают это, особенно учитывая богатство реактивного API, особенно в отношении обработки ошибок.

В любом случае мы соответствующим образом обновим документацию.

person Oleg Zhurakousky    schedule 27.02.2020
comment
Но он работает с @StreamListener("foo"), а затем с public void foo(Flux<Object> input) и не работает с Function<?, ?>, поэтому я подумал, что это скорее проблема облачной функции. В любом случае правильный подход @ServiceActivator(inputChannel = "errorChannel")? - person kluzamic; 28.02.2020
comment
Как я уже сказал, у нас он работал и с функцией, но, решая одну проблему, мы создали еще несколько. Например, что бы мы сказали опытным пользователям, которые хотят выполнять группировку, фильтрацию, агрегацию, управление окнами и т. Д. потоков? Все эти корпуса были полуразрушенными. Это реальные случаи, почему кто-то предпочел бы использовать реактивный, и в этот момент это становится проблемой. - person Oleg Zhurakousky; 29.02.2020