Запрос журнала, ответ и общее время, затраченное потоком с использованием Java DSL

Я пытаюсь создать поток, который начинается с Http.inboundGateway -> выполняет несколько действий, таких как сохранение данных запроса в базе данных, обогащение заголовка, отправка в AMQP и возврат со статусом потока (успешно/сбой).

У меня есть несколько вещей, с которыми я борюсь и не могу понять.

1.) Регистрация запроса и ответа.

Мне удалось зарегистрировать запрос, полученный Http.inboundGateway (см. ниже. Не уверен, что это правильный способ сделать это, но он работает. Пожалуйста, предложите лучший способ сделать это). Тем не менее, я не могу получить ответное сообщение, которое отправляется клиенту, а также у меня нет идей о том, как рассчитать время транзакции потока и записать его в файл журнала. Было бы очень полезно, ЕСЛИ бы у меня был способ печатать статистику после каждой транзакции, например "Получено: 5, Успешно: 4, Неудачно: 1, Среднее время транзакции: 250 мс .. и т. д."

@Bean
public IntegrationFlow httpInboundGateway()
{
    return IntegrationFlows.from(Http.inboundGateway("/httplistner")
                .requestMapping(requestMapping -> requestMapping.methods(HttpMethod.POST))
                .mappedRequestHeaders("*"))
            .transform(new ObjectToStringTransformer())
            .wireTap(flow -> flow.handle(message -> logger.info(">> Received Request from Caller.\nHeaders : "+message.getHeaders() + "\nPay Load : "+message.getPayload())))
            .channel(httpRequestChannel())
            .get();
}

2.) Как добавить операторы журнала в Spring DSL Flow?

Я хочу иметь возможность добавлять операторы журнала (для отладки) в свои определения DSL для интеграции, чтобы я мог просматривать файл журнала и понимать, что произошло и что пошло не так. На данный момент я НЕ смог найти способ сделать это, кроме добавления «.wireTap» в середине потока, как показано в определении выше. Пожалуйста, предложите, если есть лучший/правильный способ сделать это.

3.) Настройка ответа, отправляемого "Http.inboundGateway".

Я не мог понять, как настроить ответ HTTP, который Http.inboundGateway отправляет обратно клиентам после завершения потока. Как я могу это сделать, или вы можете указать мне документацию, где я могу прочитать и понять, как это сделать? Я хочу использовать Spring DSL.

То же самое относится и к ответам об ошибках. Как видите, я НЕ добавил канал ошибок в свой Http.inboundGateway. Таким образом, если ошибка происходит сейчас в его текущей конфигурации, клиент получает 500 и полную трассировку стека. Как получить сообщение об ошибке и создать собственный ответ на основе ошибки и отправить его клиенту. Пример: если они отправили мне полезную нагрузку XML, а XML искажен, я хочу иметь возможность отправить им HTTP 400 с некоторыми подробностями в ответе, указывающими, что их данные запроса неправильно сформированы.


person Rajeev K    schedule 01.02.2017    source источник
comment
Я склонен советовать всем участникам здесь, и особенно новым пользователям, не делать явных запросов в форме, пожалуйста, помогите мне с примером. Это потому, что это звучит в точности как люди, которые приходят и хотят, чтобы кто-то сделал всю их работу за них. Имейте в виду, что мы получаем здесь много попрошайничества, так что если вы приложите усилия, чтобы не звучать как эти люди, это вам очень поможет.   -  person halfer    schedule 02.02.2017
comment
@halfer: Извините за то, что это звучит отчаянно. Мне жаль, если так вышло. Будьте уверены, я сделал все возможное, чтобы понять это самостоятельно. К сожалению, я новичок в Spring Integration, и я не мог соединить точки, глядя на документацию (которую я прочитал как минимум 2-3 раза :-)).   -  person Rajeev K    schedule 02.02.2017


Ответы (1)


Трюк для захвата вывода заключается в том, что log() следует мост в никуда, названный так потому, что у него нет выходного канала, поэтому фреймворк отправляет результат обратно в шлюз.

Ну вот...

@SpringBootApplication
public class So41990546Application {

    public static void main(String[] args) {
        SpringApplication.run(So41990546Application.class, args);
    }

    @Bean
    public IntegrationFlow flow() {
        return IntegrationFlows.from(Http.inboundGateway("/foo")
                    .requestMapping(requestMapping -> requestMapping.methods(HttpMethod.POST))
                    .mappedRequestHeaders("*")
                    .requestPayloadType(String.class))
                .log(Level.INFO, m -> "Inbound: " + m.getPayload())
                .<String, String>transform(String::toUpperCase)
                .log(Level.INFO, m -> "Outbound: " + m.getPayload())
                .bridge(e -> e.id("Bridge.to.nowhere"))
                .get();
    }

    @Bean
    public IntegrationFlow errorsFlow() {
        return IntegrationFlows.from(Http.inboundGateway("/errors")
                    .requestMapping(requestMapping -> requestMapping.methods(HttpMethod.POST))
                    .mappedRequestHeaders("*")
                    .requestPayloadType(String.class)
                    .errorChannel("errors.input"))
                .log(Level.INFO, m -> "Inbound: " + m.getPayload())
                .transform("1 / 0")
                .log(Level.INFO, m -> "Outbound: " + m.getPayload())
                .bridge(e -> e.id("Another.bridge.to.nowhere"))
                .get();
    }

    @Bean
    public IntegrationFlow errors() {
        return f -> f.transform("'Error: ' + payload.cause.message")
                .enrichHeaders(b -> b.header(HttpHeaders.STATUS_CODE, 400))
                .log(Level.INFO) // log the whole message so we can see the status code
                .bridge(e -> e.id("Another.b.t.n"));
    }

}

.log был добавлен в 1.2 и использует прослушивание телефонных разговоров.

ИЗМЕНИТЬ

Если вы используете именованный канал...

    @Bean
    public IntegrationFlow flow() {
        return IntegrationFlows.from(Http.inboundGateway("/foo")
                    .requestMapping(requestMapping -> requestMapping.methods(HttpMethod.POST))
                    .mappedRequestHeaders("*")
                    .requestPayloadType(String.class))
                .channel(namedChannel())
                .log(Level.INFO, m -> "Inbound: " + m.getPayload())
                .<String, String>transform(String::toUpperCase)
                .log(Level.INFO, m -> "Outbound: " + m.getPayload())
                .bridge(e -> e.id("Bridge.to.nowhere"))
                .get();
    }

    public MessageChannel namedChannel() {
        return new DirectChannel();
    }

и включите метрики, как описано в документации вы можете получить все виды статистики из этого канала, которые будут включать среднее прошедшее время для нисходящего потока.

person Gary Russell    schedule 01.02.2017
comment
Большое спасибо за подробный пример, Гари. Я попробую и отчитаюсь. - person Rajeev K; 02.02.2017