Канал настраиваемых ошибок DSL Spring Integration не работает

Я использую DSL-реализацию Spring Integration. У меня есть код ниже, и я не могу использовать свой собственный поток ошибок. Когда метод аутентификации выдает исключение времени выполнения, errorChannel начинает обработку. Я дополняю заголовок, чтобы использовать собственный поток ошибок, но не использую.

// In Class - 1
 @Bean
    public MarshallingWebServiceInboundGateway marshallingWebServiceInboundGateway(BeanFactoryChannelResolver channelResolver, Jaxb2Marshaller marshaller) {

        MarshallingWebServiceInboundGateway wsInboundGateway = new MarshallingWebServiceInboundGateway();
        wsInboundGateway.setRequestChannel(channelResolver.resolveDestination("incomingRequest.input"));
        wsInboundGateway.setReplyChannel(channelResolver.resolveDestination("outgoingResponse.input"));
        wsInboundGateway.setErrorChannel(channelResolver.resolveDestination("errorChannel"));
        wsInboundGateway.setMarshaller(marshaller);
        wsInboundGateway.setUnmarshaller(marshaller);
        return wsInboundGateway;
    }


// In Class - 2
@Bean
    public IntegrationFlow incomingRequest() {
        return f -> f.<Object, Class<?>>route(t -> t.getClass(),
                mapping -> mapping.subFlowMapping(payloadType1(),
                        sf -> sf.gateway("type1.input", ConsumerEndpointSpec::transactional))
                        .subFlowMapping(payloadType2(),
                                sf -> sf.gateway("type2.input", ConsumerEndpointSpec::transactional)),
                        conf -> conf.id("router:Incoming request router"));
    }

// In Class - 3
    @Bean
    public IntegrationFlow type1() {
        IntegrationFlow integrationFlow = f -> f
                .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "error222", true))
                .<Type1>handle((p, h) -> authentication.authenticate(p),
                        conf -> conf.id("service-activator:Authenticate"))
                .transform(transformer::transformType1MsgToDataX,
                        conf -> conf.id("transform:Unmarshall type1 Message"))
                .enrichHeaders(h -> h.headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_ID, "payload.id")
                        .headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_TYPE, "payload.messageType"))
                .handle((GenericHandler<DataX>) repository::successResponseMessage,
                        conf -> conf.id("service-activator:return success"))
                .channel("outgoingResponse.input")
                ;

        return integrationFlow;
    }

// In Class - 3
@Bean
    public IntegrationFlow error222Flow() {

        return IntegrationFlows.from("error222").handle("repository", "failureResponseMessage").get()

                ;

    }

РЕДАКТИРОВАТЬ:

После ответов Артема мой код, как показано ниже. Но все же я не могу получить доступ к параметру заголовка в потоке ошибок. Я получаю сообщение об ошибке - "Маршрутизатор" маршрутизатор не разрешил канал: подготовка ответа об ошибке ""

 // In Class - 1
 @Bean
    public MarshallingWebServiceInboundGateway marshallingWebServiceInboundGateway(BeanFactoryChannelResolver channelResolver, Jaxb2Marshaller marshaller) {

        MarshallingWebServiceInboundGateway wsInboundGateway = new MarshallingWebServiceInboundGateway();
        wsInboundGateway.setRequestChannel(channelResolver.resolveDestination("incomingRequest.input"));
        wsInboundGateway.setReplyChannel(channelResolver.resolveDestination("outgoingResponse.input"));
        wsInboundGateway.setErrorChannel(channelResolver.resolveDestination("errorResponse.input"));
        wsInboundGateway.setMarshaller(marshaller);
        wsInboundGateway.setUnmarshaller(marshaller);
        return wsInboundGateway;
    }


// In Class - 2
@Bean
    public IntegrationFlow incomingRequest() {
        return f -> f.<Object, Class<?>>route(t -> t.getClass(),
                mapping -> mapping.subFlowMapping(payloadType1(),
                        sf -> sf.gateway("type1.input", ConsumerEndpointSpec::transactional))
                        .subFlowMapping(payloadType2(),
                                sf -> sf.gateway("type2.input", ConsumerEndpointSpec::transactional)),
                        conf -> conf.id("router:Incoming request router"));
    }

// In Class - 2
@Bean 
public IntegrationFlow errorResponse(){ 
    return f -> f.<MessageHandlingException, Object>route(t -> t.getFailedMessage().getHeaders().get("ABCDEF"), 
                        mapping -> mapping.subFlowMapping("ABCDEF", 
                                sf -> sf.gateway("customError.input", ConsumerEndpointSpec::transactional)), 
                                conf -> conf.id("router:error response prepare"));
}

// In Class - 3
    @Bean
    public IntegrationFlow type1() {
        IntegrationFlow integrationFlow = f -> f
                .enrichHeaders(h -> h.header("ABCDEF", "ABCDEF", true)) 
                .<Type1>handle((p, h) -> authentication.authenticate(p),
                        conf -> conf.id("service-activator:Authenticate"))
                .transform(transformer::transformType1MsgToDataX,
                        conf -> conf.id("transform:Unmarshall type1 Message"))
                .enrichHeaders(h -> h.headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_ID, "payload.id")
                        .headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_TYPE, "payload.messageType"))
                .handle((GenericHandler<DataX>) repository::successResponseMessage,
                        conf -> conf.id("service-activator:return success"))
                .channel("outgoingResponse.input")
                ;

        return integrationFlow;
    }

// In Class - 3
@Bean
    public IntegrationFlow customError(){
        return f -> f.handle((GenericHandler<MessageHandlingException>)eventRepository::failureResponseMessage,
                                conf -> conf.id("service-activator:return failure"));
    }

РЕДАКТИРОВАТЬ - 2:

Пробую тестовый код Артема, по такому сценарию работает. Если я конвертирую поток type1 в отображение подпотока, как показано ниже (я делаю это, потому что сомневаюсь в своем блоке кода подпотока), поток ошибок не сможет распечатать значение параметра ABCDEF. После этого я добавляю еще один заголовок (XYZTWR) в сопоставление подпотока, но его тоже нельзя распечатать.

@Bean
public IntegrationFlow type1() {
    return f -> f.<String, String>route(t -> t.toString(), mapping -> mapping.subFlowMapping("foo",
            sf -> sf.gateway("fooFlow.input", ConsumerEndpointSpec::transactional).enrichHeaders(h -> h.header("XYZTRW", "XYZTRW", true))));
}

@Bean
public IntegrationFlow fooFlow() {
    return f -> f.enrichHeaders(h -> h.header("ABCDEF", "ABCDEF", true))
            .handle((p, h) -> {
                throw new RuntimeException("intentional");
            });
}

Мой S.OUT:

GenericMessage [payload=foo, headers={history=testGateway,type1.input, id=1fad7a65-4abe-c41d-0b22-36839a103269, timestamp=1503029553071}]

person gbalcisoy    schedule 17.08.2017    source источник
comment
Я использую весеннюю интеграцию 4.3.11 и весеннюю интеграцию java dsl 1.2.2.   -  person gbalcisoy    schedule 17.08.2017


Ответы (1)


Заголовок errorChannel начинает работу, когда мы передаем сообщение другому исполнителю потока или каналу очереди. В противном случае стандартные throw и try...catch работают в одном стеке вызовов.

Итак, в вашем случае исключение аутентификации просто выбрасывается вызывающей стороне - WS Inbound Gateway. И здесь вы настроили глобальный канал ошибок.

Я сделал это тестирование:

@Configuration
@EnableIntegration
@IntegrationComponentScan
public static class ContextConfiguration {

    @Bean
    public IntegrationFlow errorResponse() {
        return IntegrationFlows.from(errorChannel())
                    .<MessagingException, Message<?>>transform(MessagingException::getFailedMessage,
                            e -> e.poller(p -> p.fixedDelay(100)))
                    .get();
    }

    @Bean
    public IntegrationFlow type1() {
            return f -> f
                    .enrichHeaders(h -> h.header("ABCDEF", "ABCDEF", true))
                    .handle((p, h) -> { throw new RuntimeException("intentional"); });
    }

    @Bean
    public PollableChannel errorChannel() {
        return new QueueChannel();
    }
}

@MessagingGateway(errorChannel = "errorChannel", defaultRequestChannel = "type1.input")
public interface TestGateway {

    Message<?> sendTest(String payload);

}

...

@Autowired
private TestGateway testGateway;

@Test
public void testErrorChannel() {
    Message<?> message = this.testGateway.sendTest("foo");
    System.out.println(message);
}

И мой СОУТ показывает мне:

GenericMessage [payload=foo, headers={ABCDEF=ABCDEF, id=ae5d2d44-46b7-912d-17d4-bf2ee656140a, timestamp=1502999446725}]

Пожалуйста, сделайте уровень ведения журнала DEBUG для категории org.springframework.integration и посмотрите, на каком этапе ваше сообщение теряет желаемые заголовки.

ОБНОВЛЕНИЕ

OK. Я вижу твою проблему. Поскольку вы используете sf -> sf.gateway("fooFlow.input", ConsumerEndpointSpec::transactional), другими словами, вы вызываете нисходящий поток через шлюз, все, что вы там сделали, остается за дверью, и в случае ошибки вы можете получить обратно только то, что вы отправляете туда - сообщение запроса шлюза. Нисходящий поток failedMessage по умолчанию проглатывается.

Чтобы решить эту проблему, вам следует рассмотреть возможность добавления errorChannel() параметра для этого .gateway() и обработать там ошибку нисходящего потока. Или ... просто не используйте .gateway() в подпотоке маршрутизатора, а просто channel отображение.

.transactional() можно настроить также на любой .handle().

person Artem Bilan    schedule 17.08.2017
comment
Спасибо. Я понимаю, но мой другой вопрос заключается в том, что я не могу передать параметр (например, значение карты заголовка) в глобальный канал ошибок, чтобы определить в нем мою настраиваемую конфигурацию. - person gbalcisoy; 17.08.2017
comment
Вместо этого вы можете просто использовать собственный канал ошибок на уровне MarshallingWebServiceInboundGateway. - person Artem Bilan; 17.08.2017
comment
Если бы я мог передать параметр заголовка в глобальный канал ошибок, это помогло бы мне, но я не могу. Есть ли способ для этого? Мне нужна информация об имени начального потока для определения процесса глобального канала ошибки. - person gbalcisoy; 17.08.2017
comment
Вы действительно можете использовать .enrichHeaders() в этом вопросе. Когда исключение происходит ниже по потоку, оно переносится в MessagingException с помощью свойства failedMessage. Именно у этого сообщения есть желаемый заголовок. ErrorMessage, отправленный в errorChannel, содержит этот MessagingException в качестве полезной нагрузки. Итак, что вам нужно, так это передать полезную нагрузку в MessagingException и вызвать его getFailedMessage(), чтобы получить доступ к сообщению с запросом о нарушении, чтобы перейти к его заголовкам для желаемого варианта. - person Artem Bilan; 17.08.2017
comment
После вашего 1-го ответа я заменил свое первое обогащение в потоке type1, как показано ниже: .enrichHeaders (h - ›h.header (ABCDEF, ABCDEF, true)), и мой поток канала ошибок таков: @Bean public IntegrationFlow errorResponse () {return f - ›f.‹ MessageHandlingException, Object ›route (t -› t.getFailedMessage (). getHeaders (). get (ABCDEF), отображение - ›mapping.subFlowMapping (ABCDEF, sf -› sf.gateway (customError.input, ConsumerEndpointSpec :: transactional)), conf - ›conf.id (маршрутизатор: подготовка ответа об ошибке)); но я получаю сообщение об ошибке: "Маршрутизатор не разрешил канал:" router: error response prepare " - person gbalcisoy; 17.08.2017
comment
Итак, я не могу получить доступ к своему параметру заголовка - person gbalcisoy; 17.08.2017
comment
Вы должны указать этот новый код в своем вопросе. Сложно прочитать это по комментариям. В этом вопросе просто простая функция EDIT. - person Artem Bilan; 17.08.2017
comment
Пожалуйста, ознакомьтесь с моим ответом со слов: I did this testing: - person Artem Bilan; 17.08.2017
comment
Артем, я попробовал и отредактировал свой вопрос как РЕДАКТИРОВАТЬ - 2. Спасибо. - person gbalcisoy; 18.08.2017
comment
См. ОБНОВЛЕНИЕ, пожалуйста. - person Artem Bilan; 19.08.2017
comment
У меня есть еще один вопрос об обработке ошибок. Речь идет об обработке проверенных исключений. Если хотите, могу задать другой вопрос. Мой метод аутентификации также может генерировать проверенные исключения, и если я не хочу их перехватывать, мой метод IntegrationBuilderFlow (type1) выдает ошибку компиляции. Из-за этого я выбрасываю свои исключения ниже: throw new MessageHandlingException (MessageBuilder.withPayload (message) .build (), new NoSuchMethodException (Failed)); Это правильный путь или есть способ для этого процесса? - person gbalcisoy; 21.08.2017