Я пытаюсь добавить фильтр, чтобы отбросить поток и продолжить выполнение основного потока даже после сбоя и агрегировать разделитель. ожидаемый тип как для ошибки, так и для успеха одинаков. конкретной логики агрегатора нет.
@Bean
public IntegrationFlow flow() {
return f -> f.split(Orders.class, Orders::getItems)
.enrich(e -> e.requestChannel("enrichChannel"))
.filter(Order.class, c -> c.getId() > 10 ? true : false,
e -> e.discardChannel(validationError()))
.handle(new MyHandler())
.transform(new MapToObjectTransformer(Order.class))
.enrich(e -> e.requestChannel("transformChannel"))
.filter(Order.class, c -> c.getTotal() > 100 ? true : false,
e -> e.discardChannel(validationError())).handle( new transformer())
.aggregate();
}
@Bean
public IntegrationFlow validationErrorFlow() {
return IntegrationFlows.from(validationError())
.handle(new ValidationHandler())
.get();
}
канал сброса не присоединяется к основному потоку для выполнения следующего элемента разделения.
Я могу написать сопоставление маршрута и подпотока, но оно станет слишком вложенным в маршрут -> подпотоки -> маршрут -> подпотоки, пытаясь решить эту проблему с помощью фильтров. есть ли лучший способ выполнить проверку и продолжить разделение для всех элементов в потоке.
Обновление 1:
.handle(request.class, (p, h) -> validator.validate(p)
.gateway("filterFlow.input")
.handle(new MyHandler())
.enrich(...)
.handle(...)
.enrich(...)
.handle(...)
.enrich(...)
.handle(...)
.aggregate();
@Bean
public IntegrationFlow filterFlow() {
return f -> f
.filter(response.class, c -> c.isValidationStatus(), df -> df.discardFlow
(flow -> flow.handle(Message.class, (p, h) -> p.getPayload())));
}
шлюз может перехватить запрос, но поток выполнил .handle(new MyHandler())
, а не следующий элемент в split()
Обновление 2: (ответ) от Артема
.handle(request.class, (p, h) -> validator.validate(p))
.filter(response.class,p -> p.isValidationStatus(), f -> f.discardChannel("aggregatorChannel"))
.handle(new MyHandler())
.enrich(...)
.handle(...)
.enrich(...)
.handle(...)
.enrich(...)
.handle(...)
.channel("aggregatorChannel")
.aggregate();
Это приведет к условному пропуску и продолжению потока.