Spring WebClient: как передать большой байт [] в файл?

Похоже, Spring RestTemplate не может передавать ответ напрямую в файл без буферизации всего этого в памяти. Как правильно достичь этого с помощью более новой Spring 5 WebClient?

WebClient client = WebClient.create("https://example.com");
client.get().uri(".../{name}", name).accept(MediaType.APPLICATION_OCTET_STREAM)
                    ....?

Я вижу, что люди нашли несколько обходных путей / приемов решения этой проблемы с RestTemplate, но меня больше интересует правильное решение этой проблемы с WebClient.

Существует множество примеров использования RestTemplate для загрузки двоичных данных, но почти все они загружают byte[] в память.


person Dave L.    schedule 19.05.2019    source источник
comment
Вы можете проверить уже доступные stackoverflow.com/questions/32988370/   -  person Sambit    schedule 19.05.2019
comment
Спасибо, но это не показывает, как это сделать с помощью WebClient.   -  person Dave L.    schedule 19.05.2019
comment
Чтобы решить проблему, вы можете использовать RestTemplate of Spring. Однако Spring 5 представил Webclient.   -  person Sambit    schedule 19.05.2019
comment
Вы также можете сослаться на эту ссылку stackoverflow.com/questions/46740000/   -  person Sambit    schedule 19.05.2019
comment
Не думаю, что это ответ на вопрос. Пожалуйста, дайте ответ, если вы так думаете.   -  person Dave L.    schedule 19.05.2019
comment
Возможный дубликат Spring WebFlux Webclient, получающий application / octet-stream файл как Mono   -  person K.Nicholas    schedule 19.05.2019
comment
@ K.Nicholas - Вы действительно думаете, что это дубликат того вопроса? Во-первых, в этом вопросе не упоминается потоковая передача непосредственно в файл (без сохранения всего ответа в памяти), что является основным моментом моего вопроса; а также этот вопрос использует Kotlin, а не Java.   -  person Dave L.    schedule 20.05.2019
comment
@DaveL. - Да, ты прав, надо было отметить это как не по теме. Сообщите нам, если у вас возникнут проблемы.   -  person K.Nicholas    schedule 20.05.2019
comment
›Да, вы правы, надо было отметить это как не по теме. @ K.Nicholas Я не уверен, почему вы пытаетесь найти способ опровергнуть мой вопрос, но не стесняйтесь просмотрите stackoverflow.com / help / on-topic и кодекс поведения.   -  person Dave L.    schedule 21.05.2019
comment
Это скорее просьба написать для вас код, чем ответ на вопрос о проблеме, с которой вы столкнулись. Неважно, может быть, кто-то здесь сделает это за вас. Я недостаточно заинтересован, чтобы делать это самому. Позже я увидел ответы ближе, чем тот, который я опубликовал, но я не увидел ничего очевидного, включая сохранение потока по мере его поступления. Похоже, вам нужно будет открыть файловый поток, а также поток ответов и скопировать блоки данных между ними.   -  person K.Nicholas    schedule 21.05.2019
comment
Просто чтобы прояснить для других; это не совсем так. Ссылка на конкретный пример, описание + ссылка на правильный api или, самое большее, пара строк кода примера вполне достаточно.   -  person Dave L.    schedule 21.05.2019
comment
Удачи с этим @DaveL ??   -  person James Gawron    schedule 08.06.2019
comment
@JamesGawron Нет, у меня не было возможности проверить ответ ниже.   -  person Dave L.    schedule 21.06.2019
comment
@DaveL. Любые нижеприведенные решения работали для вас без загрузки файла в память. На самом деле, у меня такая же проблема, как и у вас.   -  person shashantrika    schedule 08.12.2019


Ответы (4)


С недавним стабильным Spring WebFlux (5.2.4.RELEASE на момент написания):

final WebClient client = WebClient.create("https://example.com");
final Flux<DataBuffer> dataBufferFlux = client.get()
        .accept(MediaType.TEXT_HTML)
        .retrieve()
        .bodyToFlux(DataBuffer.class); // the magic happens here

final Path path = FileSystems.getDefault().getPath("target/example.html");
DataBufferUtils
        .write(dataBufferFlux, path, CREATE_NEW)
        .block(); // only block here if the rest of your code is synchronous

Для меня неочевидной частью был bodyToFlux(DataBuffer.class), как он сейчас упоминается в общий раздел о потоковой передаче документации Spring, нет прямой ссылки на него в разделе WebClient.

person Z4-    schedule 17.03.2020
comment
DataBuffer в основном выглядит как ByteBuffer. Я не вижу никакой координации между читателем и писателем, ни какого-либо способа установить ограничение на размер буфера. Как узнать, что DataBuffer является реактивным (или многопоточным) и имеет ограниченный размер? - person Shannon; 11.11.2020
comment
Неважно, очевидно, что поток генерирует столько буферов данных, сколько нужно, каждый из которых содержит фрагмент данных ответа. Не уверен, как определяется размер, может в конфиге Netty. И каждый DataBuffer завершен, когда он передается в Flux, поэтому координация не требуется. - person Shannon; 11.11.2020

Я не могу проверить, эффективно ли следующий код не буферизует содержимое webClient полезной нагрузки в памяти. Тем не менее, я думаю, вам следует начать с этого:

public Mono<Void> testWebClientStreaming() throws IOException {
    Flux<DataBuffer> stream = 
            webClient
                    .get().accept(MediaType.APPLICATION_OCTET_STREAM)
                    .retrieve()
            .bodyToFlux(DataBuffer.class);
    Path filePath = Paths.get("filename");
    AsynchronousFileChannel asynchronousFileChannel = AsynchronousFileChannel.open(filePath, WRITE);
    return DataBufferUtils.write(stream, asynchronousFileChannel)
            .doOnNext(DataBufferUtils.releaseConsumer())
            .doAfterTerminate(() -> {
                try {
                    asynchronousFileChannel.close();
                } catch (IOException ignored) { }
            }).then();
}
person Felipe Moraes    schedule 23.05.2019
comment
К вашему сведению, в квартале появился новый ребенок. См. DataBufferUtils # write (Publisher ‹DataBuffer›, Path, OpenOption ...) - person Jin Kwon; 05.10.2019

Сохраните тело во временном файле и используйте

static <R> Mono<R> writeBodyToTempFileAndApply(
        final WebClient.ResponseSpec spec,
        final Function<? super Path, ? extends R> function) {
    return using(
            () -> createTempFile(null, null),
            t -> write(spec.bodyToFlux(DataBuffer.class), t)
                    .thenReturn(function.apply(t)),
            t -> {
                try {
                    deleteIfExists(t);
                } catch (final IOException ioe) {
                    throw new RuntimeException(ioe);
                }
            }
    );
}

Трубить тело и потреблять

static <R> Mono<R> pipeBodyAndApply(
        final WebClient.ResponseSpec spec, final ExecutorService executor,
        final Function<? super ReadableByteChannel, ? extends R> function) {
    return using(
            Pipe::open,
            p -> {
                final Future<Disposable> future = executor.submit(
                        () -> write(spec.bodyToFlux(DataBuffer.class), p.sink())
                                .log()
                                .doFinally(s -> {
                                    try {
                                        p.sink().close();
                                        log.debug("p.sink closed");
                                    } catch (final IOException ioe) {
                                        throw new RuntimeException(ioe);
                                    }
                                })
                                .subscribe(DataBufferUtils.releaseConsumer())
                );
                return just(function.apply(p.source()))
                        .log()
                        .doFinally(s -> {
                            try {
                                final Disposable disposable = future.get();
                                assert disposable.isDisposed();
                            } catch (InterruptedException | ExecutionException e) {
                                e.printStackTrace();
                            }
                        });
            },
            p -> {
                try {
                    p.source().close();
                    log.debug("p.source closed");
                } catch (final IOException ioe) {
                    throw new RuntimeException(ioe);
                }
            }
    );
}
person Jin Kwon    schedule 05.10.2019
comment
Разве этот ответ не дублирует stackoverflow.com/a/56096484/839733? - person Abhijit Sarkar; 26.07.2020
comment
@AbhijitSarkar Да. - person Jin Kwon; 26.07.2020

Я не уверен, есть ли у вас доступ к RestTemplate в вашем текущем использовании spring, но этот сработал для меня.


RestTemplate restTemplate // = ...;

RequestCallback requestCallback = request -> request.getHeaders()
        .setAccept(Arrays.asList(MediaType.APPLICATION_OCTET_STREAM, MediaType.ALL));

// Streams the response
ResponseExtractor<Void> responseExtractor = response -> {
    // Here I write the response to a file but do what you like
    Path path = Paths.get("http://some/path");
    Files.copy(response.getBody(), path);
    return null;
};
restTemplate.execute(URI.create("www.something.com"), HttpMethod.GET, requestCallback, responseExtractor);

person Sunny Pelletier    schedule 26.05.2019
comment
Спасибо за ответ. Я вижу, что код такой же, как здесь: stackoverflow.com/a/38664475/449652. Однако это больше не работает с Spring 5 или новее - см. Эту проблему: github.com/spring-projects/spring-framework/issues/19448 - person Dave L.; 28.05.2019
comment
На оригинальном плакате говорилось, что он хотел бы сделать это должным образом, Webclient не блокируя. - person Z4-; 17.03.2020
comment
RestTemplate скоро станет устаревшим. - person NobodySomewhere; 28.10.2020