Проверка интеграции Spring в потоке разделения и агрегирования

Я пытаюсь добавить фильтр, чтобы отбросить поток и продолжить выполнение основного потока даже после сбоя и агрегировать разделитель. ожидаемый тип как для ошибки, так и для успеха одинаков. конкретной логики агрегатора нет.

@Bean
public IntegrationFlow flow() {
     return f -> f.split(Orders.class, Orders::getItems)
    .enrich(e -> e.requestChannel("enrichChannel"))
    .filter(Order.class, c -> c.getId() > 10 ? true : false,
    e -> e.discardChannel(validationError()))
    .handle(new MyHandler())
    .transform(new MapToObjectTransformer(Order.class))
    .enrich(e -> e.requestChannel("transformChannel"))
    .filter(Order.class, c -> c.getTotal() > 100 ? true : false,
    e -> e.discardChannel(validationError())).handle( new transformer())
    .aggregate();
 }

@Bean
public IntegrationFlow validationErrorFlow() {
 return IntegrationFlows.from(validationError())
         .handle(new ValidationHandler())
         .get();
}

канал сброса не присоединяется к основному потоку для выполнения следующего элемента разделения.

Я могу написать сопоставление маршрута и подпотока, но оно станет слишком вложенным в маршрут -> подпотоки -> маршрут -> подпотоки, пытаясь решить эту проблему с помощью фильтров. есть ли лучший способ выполнить проверку и продолжить разделение для всех элементов в потоке.

Обновление 1:

.handle(request.class, (p, h) -> validator.validate(p)
.gateway("filterFlow.input")
.handle(new MyHandler())
.enrich(...)
.handle(...)
.enrich(...)
.handle(...)
.enrich(...)
.handle(...)
.aggregate();



@Bean
    public IntegrationFlow filterFlow() {
        return f -> f
                .filter(response.class, c -> c.isValidationStatus(), df -> df.discardFlow
                        (flow -> flow.handle(Message.class, (p, h) -> p.getPayload())));
    }

шлюз может перехватить запрос, но поток выполнил .handle(new MyHandler()), а не следующий элемент в split()

Обновление 2: (ответ) от Артема

.handle(request.class, (p, h) -> validator.validate(p))
    .filter(response.class,p -> p.isValidationStatus(), f -> f.discardChannel("aggregatorChannel"))
    .handle(new MyHandler())
    .enrich(...)
    .handle(...)
    .enrich(...)
    .handle(...)
    .enrich(...)
    .handle(...)
    .channel("aggregatorChannel")
    .aggregate();

Это приведет к условному пропуску и продолжению потока.


person kiran reddy    schedule 21.02.2018    source источник


Ответы (1)


канал сброса не присоединяется к основному потоку для выполнения следующего элемента разделения.

Это правда. Вот как это устроено. В большинстве случаев поток отбрасывания похож на очередь недоставленных писем в JMS. Итак, это короткая ветка с односторонним движением.

Если вы действительно хотите вернуться к основному потоку, вам следует подумать об использовании канала именования в определении потока. Я имею в виду в точке, где вы хотели бы вернуться после потока компенсации (сброса):

.filter(Order.class, c -> c.getId() > 10 ? true : false,
                    e -> e.discardFlow(sf -> sf
                            .gateway(validationError())
                            .channel("myHandleChannel")))
            .channel("myHandleChannel")
            .handle(new MyHandler())

Я использую gateway(), потому что нам нужен ответ от потока сброса для продолжения обработки. Нам нужен этот .channel("myHandleChannel") в конце подпотока, потому что поток отбрасывания - это ветвь.

Другой способ сделать это - использовать .gateway() в основном потоке:

.gateway("filterFlow.input")
.handle(new MyHandler())

...

@Bean
public IntegrationFlow filterFlow() {
    return f -> f
            .filter(Order.class, c -> c.getId() > 10 ? true : false,
                    e -> e.discardChannel(validationError()));
}

Мы отправляем на discardChannel такое же сообщение запроса, поэтому правильный заголовок replyChannel для упомянутого шлюза все еще существует. Единственное, что вам нужно, - это обеспечить правильный ответ от .handle(new ValidationHandler()).

person Artem Bilan    schedule 21.02.2018
comment
Понимаете. Это не читается. Пожалуйста, подумайте о том, чтобы отредактировать свой вопрос с правильно отформатированным кодом. Пожалуйста, уважайте помощь, которую вам оказывают. - person Artem Bilan; 21.02.2018
comment
Обновил вопрос, одно неверное нажатие на клавиатуру, запуталось форматирование. - person kiran reddy; 21.02.2018
comment
rather than the next item in split(). Звучит как другой вопрос, и он не совпадает с вашим другим предложением: the discard channel is not joining back to the main flow. Я ответил на это беспокойство. Иначе в вашем вопросе непонятно, что бы вы хотели увидеть. В конечном итоге вы предлагаете поведение по умолчанию: фильтр отбрасывает и возвращает управление обратно в основной поток для следующего разделенного элемента, если вы не используете throwExceptionOnRejection, но это не похоже. - person Artem Bilan; 22.02.2018
comment
да, на исходный вопрос дан ответ, но требовалось другое поведение, поток не должен выполняться после gateway и искать следующий элемент в разбиении и aggregate() в конце. Я попробовал throwExceptionOnRejection / exception в канале сброса, в этом случае весь поток полностью приостанавливается, а оставшиеся элементы в разделении отбрасываются. переход по маршруту разговора и подпотоку - единственный выбор для этого поведения, а не использование фильтра - person kiran reddy; 22.02.2018
comment
Что ж, может быть, у вас есть идея обратиться к агрегатору прямо в потоке сброса, минуя все эти этапы обработки? Дело в том, что когда мы используем фильтр со стандартным поведением, стандартный агрегатор не будет работать, потому что для группировки будет недостаточно разделенных элементов. Таким образом, вам определенно может потребоваться как-то вернуться к основному потоку из подпотока отбрасывания и просто в конечную точку агрегатора. С этой целью названный канал снова является для вас решением. - person Artem Bilan; 22.02.2018
comment
Да, именно это я и пытаюсь сделать. вот почему мой первый вопрос о возвращении из discardflow обратно в основной поток с полезной нагрузкой отлично работал с вашим образцом. Я пытаюсь понять, как перейти к aggregate(), пропуская все последующие шаги, или продолжить шаги, если фильтр вернул true. вы хотите сказать, что считайте названный канал из отброшенного потока в агрегат? - person kiran reddy; 22.02.2018
comment
Вы помещаете .channel("aggregatorChannel") перед .aggregate(), а в потоке отбрасывания используйте .channel("aggregatorChannel") для отправки результата потока отбрасывания непосредственно агрегатору. Таким образом, вы не должны использовать .gateway(), но мое первое предложение с .filter(). Вы знаете, что это другая история. Начнем с отдельного SO-вопроса! И считайте это исправленным. - person Artem Bilan; 22.02.2018
comment
Я только что исправил свой код, то, что вы предложили, довольно гладкое, я никогда не думал о размещении приемника discardchannel перед агрегатом, всегда понимал, что поток прервется и никогда не вернется к основному потоку, как только начнется сброс. отбрасывание и получение в одном непрерывном потоке :): D. как всегда огромное спасибо !! - person kiran reddy; 22.02.2018
comment
Ах! Задача была настолько проста - просто сбросить прямо в агрегатор! :-) - person Artem Bilan; 22.02.2018
comment
Правильно, никогда не забывайте, что между конечными точками есть каналы, и вы всегда можете отправлять сообщения на этот канал из любого места, включая поток отбрасывания. - person Artem Bilan; 22.02.2018