Как вы читаете и печатаете разделенный на фрагменты HTTP-ответ с помощью java.net.http по мере поступления фрагментов?

В Java 11 представлен новый пакет java.net.http для выполнения HTTP-запросов. Для общего использования это довольно просто.

Мой вопрос: как мне использовать java.net.http для обработки фрагментированных ответов, когда каждый фрагмент получен клиентом?

java.http.net содержит реактивный BodySubscriber, который кажется мне нужным, но я не могу найти пример его использования.

http_get_demo.py

Ниже приведена реализация Python, которая печатает фрагменты по мере их поступления, я хотел бы сделать то же самое с java.net.http:

import argparse
import requests


def main(url: str):
    with requests.get(url, stream=True) as r:
        for c in r.iter_content(chunk_size=1):
            print(c.decode("UTF-8"), end="")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Read from a URL and print as text as chunks arrive")
    parser.add_argument('url', type=str, help="A URL to read from")
    args = parser.parse_args()

    main(args.url)

HttpGetDemo.java

Для полноты картины приведем простой пример создания запроса на блокировку с использованием java.net.http:

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpRequest;

public class HttpGetDemo {

  public static void main(String[] args) throws Exception {

    var request = HttpRequest.newBuilder()
            .uri(URI.create(args[0]))
            .build();

    var bodyHandler = HttpResponse.BodyHandlers
            .ofString();

    var client = HttpClient.newHttpClient();
    var response = client.send(request, bodyHandler);
    System.out.println(response.body());

  }
}

HttpAsyncGetDemo.java

А вот пример неблокирующего/асинхронного запроса:

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpRequest;

/**
 * ReadChunked
 */
public class HttpAsyncGetDemo {

  public static void main(String[] args) throws Exception {

    var request = HttpRequest.newBuilder()
            .uri(URI.create(args[0]))
            .build();

    var bodyHandler = HttpResponse.BodyHandlers
            .ofString();

    var client = HttpClient.newHttpClient();

    client.sendAsync(request, bodyHandler)
            .thenApply(HttpResponse::body)
            .thenAccept(System.out::println)
            .join();

  }
}

person hohonuuli    schedule 05.10.2018    source источник


Ответы (4)


Код Python не гарантирует, что данные тела ответа будут доступны в одном блоке HTTP единовременно. Он просто предоставляет приложению небольшие объемы данных, тем самым уменьшая объем памяти, потребляемой на уровне приложения (она может быть буферизована ниже в стеке). HTTP-клиент Java 11 поддерживает потоковую передачу через один из обработчиков тела потоковой передачи, HttpResponse.BodyHandlers: ofInputStream, ofByteArrayConsumer, asLines и т. д.

Или напишите свой собственный обработчик/подписчик, как показано: https://www.youtube.com/watch?v=qiaC0QMLz5Y

person chegar999    schedule 12.10.2018

Вы можете печатать ByteBuffer по мере их поступления, но нет гарантии, что ByteBuffer соответствует фрагменту. Чанки обрабатываются стеком. Для каждого чанка будет помещен один ByteBuffer слайс, но если в буфере недостаточно места, то будет вытеснен неполный чанк. Все, что видит потребитель, — это поток ByteBuffer, содержащих данные. Итак, что вы можете сделать, так это распечатать эти ByteBuffer по мере их поступления, но у вас нет гарантии, что они соответствуют ровно одному фрагменту, который был отправлен сервером.

Примечание. Если тело вашего запроса основано на тексте, вы можете использовать BodyHandlers.fromLineSubscriber(Subscriber<? super String> subscriber) с пользовательским Subscriber<String>, который будет печатать каждую строку по мере ее поступления. BodyHandlers.fromLineSubscriber выполняет жесткое слово декодирования байтов в символы с использованием набора символов, указанного в заголовках ответа, при необходимости буферизуя байты до тех пор, пока они не будут декодированы (байтовый буфер может заканчиваться в середине последовательности кодирования, если текст содержит символы, закодированные в нескольких байтах). ), и разбивая их на границе строки. Метод Subscriber::onNext будет вызываться один раз для каждой строки в тексте. См. https://download.java.net/java/early_access/jdk11/docs/api/java.net.http/java/net/http/HttpResponse.BodyHandlers.html#fromLineSubscriber(java.util.concurrent.Flow.Subscriber) для получения дополнительной информации.

person daniel    schedule 12.10.2018

Спасибо @pavel и @chegar999 за частичные ответы. Они привели меня к моему решению.

Обзор

Решение, которое я придумал, приведено ниже. По сути, решение состоит в том, чтобы использовать пользовательский файл java.net.http.HttpResponse.BodySubscriber. BodySubscriber содержит реактивные методы (onSubscribe, onNext, onError и onComplete) и метод getBody, который в основном возвращает java CompletableFuture, который в конечном итоге создаст тело HTTP-запроса. Когда у вас есть BodySubscriber, вы можете использовать его следующим образом:

HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
    .uri(URI.create(uri))
    .build();

return client.sendAsync(request, responseInfo -> new StringSubscriber())
    .whenComplete((r, t) -> System.out.println("--- Status code " + r.statusCode()))
    .thenApply(HttpResponse::body);

Обратите внимание на строку:

client.sendAsync(request, responseInfo -> new StringSubscriber())

Здесь мы регистрируем нашего пользовательского BodySubscriber; в этом случае мой пользовательский класс называется StringSubscriber.

CustomSubscriber.java

Это полный рабочий пример. Используя Java 11, вы можете запустить его без компиляции. Просто вставьте его в файл с именем CustomSubscriber.java, затем запустите команду java CustomSubscriber <some url>. Он печатает содержимое каждого фрагмента по мере его поступления. Он также собирает их и возвращает в виде тела после завершения ответа.

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import java.util.stream.Collectors;
import java.util.List;

public class CustomSubscriber {

  public static void main(String[] args) {
    CustomSubscriber cs = new CustomSubscriber();
    String body = cs.get(args[0]).join();
    System.out.println("--- Response body:\n: ..." + body + "...");
  }

  public CompletableFuture<String> get(String uri) {
    HttpClient client = HttpClient.newHttpClient();
    HttpRequest request = HttpRequest.newBuilder()
        .uri(URI.create(uri))
        .build();

    return client.sendAsync(request, responseInfo -> new StringSubscriber())
        .whenComplete((r, t) -> System.out.println("--- Status code " + r.statusCode()))
        .thenApply(HttpResponse::body);
  }

  static class StringSubscriber implements BodySubscriber<String> {

    final CompletableFuture<String> bodyCF = new CompletableFuture<>();
    Flow.Subscription subscription;
    List<ByteBuffer> responseData = new CopyOnWriteArrayList<>();

    @Override
    public CompletionStage<String> getBody() {
      return bodyCF;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
      this.subscription = subscription;
      subscription.request(1); // Request first item
    }

    @Override
    public void onNext(List<ByteBuffer> buffers) {
      System.out.println("-- onNext " + buffers);
      try {
        System.out.println("\tBuffer Content:\n" + asString(buffers));
      } 
      catch (Exception e) {
        System.out.println("\tUnable to print buffer content");
      }
      buffers.forEach(ByteBuffer::rewind); // Rewind after reading
      responseData.addAll(buffers);
      subscription.request(1); // Request next item
    }

    @Override
    public void onError(Throwable throwable) {
      bodyCF.completeExceptionally(throwable);
    }

    @Override
    public void onComplete() {
      bodyCF.complete(asString(responseData));
    }

    private String asString(List<ByteBuffer> buffers) {
      return new String(toBytes(buffers), StandardCharsets.UTF_8);
    }

    private byte[] toBytes(List<ByteBuffer> buffers) {
      int size = buffers.stream()
          .mapToInt(ByteBuffer::remaining)
          .sum();
      byte[] bs = new byte[size];
      int offset = 0;
      for (ByteBuffer buffer : buffers) {
        int remaining = buffer.remaining();
        buffer.get(bs, offset, remaining);
        offset += remaining;
      }
      return bs;
    }

  }
}

Пробовать это

Чтобы протестировать это решение, вам понадобится сервер, который отправляет ответ, использующий Transfer-encoding: chunked, и отправляет его достаточно медленно, чтобы наблюдать за поступлением фрагментов. Я создал его на https://github.com/hohonuuli/demo-chunk-server, но вы можете раскрутить его с помощью Docker следующим образом:

docker run -p 8080:8080 hohonuuli/demo-chunk-server

Затем запустите код CustomSubscriber.java, используя java CustomSubscriber.java http://localhost:8080/chunk/10

person hohonuuli    schedule 30.10.2018

Теперь существует новая библиотека Java для удовлетворения таких требований RxSON: https://github.com/rxson/rxson Он использует JsonPath с RxJava для считывания потоков JSON-фрагментов из ответа, как только они прибывают, и анализирует их в Java-объекты.

Пример:

String serviceURL = "https://think.cs.vt.edu/corgis/datasets/json/airlines/airlines.json";
   HttpRequest req = HttpRequest.newBuilder(URI.create(serviceURL)).GET().build();
   RxSON rxson = new RxSON.Builder().build();

   String jsonPath = "$[*].Airport.Name";
   Flowable<String> airportStream = rxson.create(String.class, req, jsonPath);
   airportStream
       .doOnNext(it -> System.out.println("Received new item: " + it))
       //Just for test
       .toList()
       .blockingGet();
person Mohamed Aly    schedule 23.10.2020
comment
в исходном вопросе даже упоминался JSON в качестве типа контента ответов? - person Michael Welch; 23.10.2020