WebClient не читает ответ, пока не завершится запись запроса.

Я пытаюсь реализовать потоковый прокси. Я столкнулся с проблемой с WebClient из Spring Reactive.

Может ли кто-нибудь помочь мне понять, я ошибаюсь или это просто ошибка со стороны WebClient?

Стек:

response-netty 0.7.8.RELEASE

Spring-boot 2.0.4.RELEASE

Desc:

Я хочу передать длинный поток внешней службе, а затем переслать поток ответов запрашивающей стороне. Потоковая передача осуществляется с использованием фрагментов (HTTP 1.1 Transfer-Encoding: фрагментировано). Внешний сервис обрабатывает каждый чанк и отправляет результат ответа.

Ожидаемое поведение:

WebClient должен немедленно прочитать каждую полученную часть ответа.

Фактическое поведение:

WebClient не запускает процесс ответа до тех пор, пока запись запроса не будет завершена.

Код:

return client
    .post()
    .header("Transfer-Encoding", "chunked")
//because I want to flush each received part
    .body((outputMessage, context) -> outputMessage.writeAndFlushWith(
        request.body(BodyExtractors.toDataBuffers())
               .map(dataBuffer -> Mono.just(dataBuffer))))
    .exchange()
    .flatMap(clientResponse -> {
      ServerResponse.BodyBuilder bodyBuilder = ServerResponse.status(clientResponse.statusCode());
      bodyBuilder.contentType(MediaType.APPLICATION_STREAM_JSON);

      return bodyBuilder.body((outputMessage, context) ->                                                        
          outputMessage.writeAndFlushWith(                                               
            clientResponse.body(BodyExtractors.toDataBuffers())                                                               
                          .map(dataBuffer -> Mono.just(dataBuffer))
                         ));}
);

person smoomrik    schedule 31.08.2018    source источник


Ответы (2)


Я просмотрел его, и мне кажется, что по дизайну и Spring WebFlux WebClient, и Reactor Netty HttpClient спроектированы так, чтобы сначала обрабатывать запрос (отправка тела запроса), а затем читать тело ответа.

Другие HTTP-клиенты могут позволить это, но я думаю, что в данном случае это способ связать противодавление как для операций чтения / записи, так и для объединения всего в единый реактивный конвейер.

Возможно, вы ищете ориентированный на сообщения двунаправленный транспортный протокол с поддержкой обратного давления. Вы можете взглянуть на WebSockets (хотя вам нужно будет определить там свою собственную семантику сообщений) или следить за RSocket.

Если вы просто ищете эффективный реактивный шлюз, тогда Spring Cloud Gateway - ваш лучший выбор, поскольку он полностью реагирует и поддерживает интересные дополнительные функции.

Несколько дополнительных примечаний:

Spring WebFlux (как на уровне клиента, так и на уровне сервера) использует реализации Encoder и Decoder, адаптируясь к Content-Type сообщения. Некоторые конкретные типы контента, такие как application/streaming+json или text/event-stream, реализованы с учетом сценариев потоковой передачи. Это означает, что кодировщики пишут сообщения по мере их поступления, разделяются определенными символами и сбрасываются в сети. Использование обычных типов мультимедиа, таких как application/octet-stream или application/json, не вызовет такого поведения. В таких случаях прокси и посредники могут буферизовать тело сообщения и доставлять окна большего / меньшего размера. Вот почему такие механизмы требуют разделителя между сообщениями и соответствующими кодеками.

Насколько я понимаю, вы используете HTTP 1.1, который использует механизм запроса / ответа - спецификация HTTP явно не запрещает серверу писать ответ перед чтением полного запроса, но в нем говорится, что он должен прочитать полное тело запроса (или закрыть соединение), несмотря ни на что. См. https://tools.ietf.org/html/rfc7230#section-3.4

Как всегда, вы можете запросить улучшения на https://jira.spring.io, хотя в этом случае я думаю, это сделано намеренно.

person Brian Clozel    schedule 31.08.2018
comment
Я говорю о заголовке типа содержимого HTTP. Если вы не рассматриваете это или если это «приложение / октет-поток», то writeAndFlushWith контрпродуктивен, а регулярная запись лучше. - person Brian Clozel; 01.09.2018
comment
Тип содержимого - это просто байты, разделенные на блоки. Я хочу реализовать аналогичный процесс, как в SSE, но с использованием фрагментов кодирования передачи. Спецификация HTTP, если я правильно понимаю, не имеет строгих ограничений на чтение ответа до того, как запрос будет полностью написан. Что касается снижения производительности при сбросе, я собираюсь сравнить результаты с / без очистки каждого фрагмента. Во всяком случае, вопрос не об этом. Вопрос, почему веб-клиент не слушает поток ответов? - person smoomrik; 01.09.2018
comment
Что касается промывки, как я могу принудительно промыть, когда получено некоторое количество кусков? Это вообще возможно? - person smoomrik; 01.09.2018
comment
Интересно. Я должен это проверить, но по замыслу WebClient может написать все до того, как начать читать ответ. Можно возразить, что сервер не должен отвечать до получения полного тела сообщения, потому что он должен закрыть соединение, если оно недействительно. См. tools.ietf.org/html/rfc7230#section-3.4. - person Brian Clozel; 01.09.2018
comment
Вы правы, этот вопрос не о смывании, давайте рассмотрим это в другом вопросе. - person Brian Clozel; 01.09.2018
comment
Также: не могли бы вы ответить на мой вопрос? Каково точное значение заголовка типа содержимого http? - person Brian Clozel; 01.09.2018
comment
Я не указываю это в запросе, а внешняя служба отвечает application / json, но тело содержит много объектов, которые поступают по частям. 1 кусок == 1 объект json. - person smoomrik; 01.09.2018
comment
Значит, вы отправляете запросы на публикацию без типа контента? - person Brian Clozel; 01.09.2018
comment
Я пробовал протестировать этот вариант использования с application/streaming+json, но все равно получил описанное вами поведение. Я обновил свой ответ, насколько мне известно. - person Brian Clozel; 01.09.2018
comment
Большое спасибо. Я отмечаю как отвеченный, но это конкретное поведение должно быть в документации. Также я не могу использовать websocket или просто tcp, так как эта внешняя служба не находится под моим контролем. Они нарушают правила :) - person smoomrik; 01.09.2018

Только что протестировали реализацию WebClient на основе Jetty, и она ведет себя так, как вы ожидаете. Он может начать чтение ответа до того, как будет отправлено все содержимое запроса. Он должен был быть выпущен в Spring Framework 5.1. WebClient on Jetty new feature issue

person Sergii Karpenko    schedule 22.10.2018