Правильный способ потоковой передачи с использованием ResponseEntity и обеспечение закрытия InputStream

Одно из наших приложений допускает утечку файловых дескрипторов, и мы еще не нашли причину этого.

В коде я вижу несколько функций, похожих на эту:

public ResponseEntity<InputStreamResource> getFoo( ... ) {
    InputStream content = getContent(...)
    InputStreamResource isr = new InputStreamResource(content);
    return ResponseEntity.status(HttpServletResponse.SC_OK).body(isr);
}

(проверки if и try / catch удалены для краткости)

Я уверен, что этот раздел вызывает проблему, потому что, когда я тестирую этот конкретный код с помощью JMeter, я вижу, что getContent() не работает на этом этапе:

is = Files.newInputStream(f.toPath());

Обычно я закрываю InputStream, но из-за этого короткого и простого кода я не могу закрыть поток перед return или вызовом body.

Когда я запускаю lsof (код работает в Linux), я вижу, что тысячи файлов открыты в режиме чтения. Поэтому я уверен, что эта проблема вызвана тем, что поток не закрывается.

Есть ли код передовой практики, которым я должен торговать?


person Marged    schedule 14.08.2018    source источник
comment
Просто предположение, можете ли вы проверить утечку в запросах HEAD или GET?   -  person reith    schedule 14.08.2018
comment
Насколько я понимаю, с HEAD этого не происходит, только с GET   -  person Marged    schedule 14.08.2018
comment
Хорошо, я предполагаю, что поскольку неявный HEAD не будет потреблять тело, это может вызвать утечку. Это не так.   -  person reith    schedule 14.08.2018


Ответы (4)


вы можете попробовать использовать StreamingResponseBody

StreamingResponseBody

Тип возвращаемого значения метода контроллера для асинхронной обработки запроса, когда приложение может писать непосредственно в поток ответа OutputStream, не задерживая поток контейнера сервлета.

Поскольку вы работаете в отдельном потоке и пишете непосредственно в ответ, ваша проблема с вызовом close() до return решена.

вероятно, вы можете начать со следующего примера

public ResponseEntity<StreamingResponseBody> export(...) throws FileNotFoundException {
    //...

    InputStream inputStream = new FileInputStream(new File("/path/to/example/file"));


    StreamingResponseBody responseBody = outputStream -> {

        int numberOfBytesToWrite;
        byte[] data = new byte[1024];
        while ((numberOfBytesToWrite = inputStream.read(data, 0, data.length)) != -1) {
            System.out.println("Writing some bytes..");
            outputStream.write(data, 0, numberOfBytesToWrite);
        }

        inputStream.close();
    };

    return ResponseEntity.ok()
            .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=generic_file_name.bin")
            .contentType(MediaType.APPLICATION_OCTET_STREAM)
            .body(responseBody);
}

Вы также можете попробовать использовать Files (начиная с java 7)

так что вам не нужно управлять InputStream

    File file = new File("/path/to/example/file");

    StreamingResponseBody responseBody = outputStream -> {
        Files.copy(file.toPath(), outputStream);
    };

Как описано в комментарии @Stackee007, при большой нагрузке в производственной среде рекомендуется также определить класс @Configuration для TaskExecutor для настройки параметров и управления процессами Async.

@Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfiguration implements AsyncConfigurer {

    private final Logger log = LoggerFactory.getLogger(AsyncConfiguration.class);

    private final TaskExecutionProperties taskExecutionProperties;

    public AsyncConfiguration(TaskExecutionProperties taskExecutionProperties) {
        this.taskExecutionProperties = taskExecutionProperties;
    }

    //  ---------------> Tune parameters here
    @Override
    @Bean(name = "taskExecutor")
    public Executor getAsyncExecutor() {
        log.debug("Creating Async Task Executor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(taskExecutionProperties.getPool().getCoreSize());
        executor.setMaxPoolSize(taskExecutionProperties.getPool().getMaxSize());
        executor.setQueueCapacity(taskExecutionProperties.getPool().getQueueCapacity());
        executor.setThreadNamePrefix(taskExecutionProperties.getThreadNamePrefix());
        return executor;
    }
    
    //  ---------------> Use this task executor also for async rest methods
    @Bean
    protected WebMvcConfigurer webMvcConfigurer() {
        return new WebMvcConfigurer() {
            @Override
            public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
                configurer.setTaskExecutor(getTaskExecutor());
            }
        };
    }

    @Bean
    protected ConcurrentTaskExecutor getTaskExecutor() {
        return new ConcurrentTaskExecutor(this.getAsyncExecutor());
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new SimpleAsyncUncaughtExceptionHandler();
    }
}

Как протестировать с помощью mockMvc

Вы можете просто следовать этому примеру кода в своем интеграционном тесте как:

    .andExpect(request().asyncStarted())
    .andDo(MvcResult::getAsyncResult)
    .andExpect(status().isOk()).getResponse().getContentAsByteArray();

Тип содержимого ResponseEntity<StreamingResponseBody> в этом примере — это MediaType.APPLICATION_OCTET_STREAM, и вы можете получить byte[] (.getContentAsByteArray()), но вы можете получить String/Json/открытый текст всего, в зависимости от типа содержимого ответа вашего тела.

person ValerioMC    schedule 26.09.2018
comment
Это выглядит многообещающе. Вы хотите немного усилить это, добавив больше информации? - person Marged; 27.09.2018
comment
Обратите внимание на примечание об использовании StreamingBodyResponse. Примечание: при использовании этого параметра настоятельно рекомендуется явно настроить TaskExecutor, используемый в Spring MVC для выполнения асинхронных запросов. Как конфигурация Java MVC, так и пространства имен MVC предоставляют параметры для настройки асинхронной обработки. Если они не используются, приложение может установить свойство taskExecutor RequestMappingHandlerAdapter. - person Stackee007; 27.09.2018
comment
добавлена ​​реализация конфигурации TaskExecutor - person ValerioMC; 10.07.2019
comment
Как протестировать с помощью mockMvc — добавьте .andExpect(request().asyncStarted()) .andDo(MvcResult::getAsyncResult) в свои интеграционные тесты. источник - person user11153; 28.01.2020
comment
я написал небольшой пример с интеграционным тестом mockMvc - person ValerioMC; 07.07.2020
comment
Можно ли использовать StreamingBodyResponse с Jdk 1.6, у меня есть устаревший проект, в котором необходимо использовать асинхронную потоковую передачу ответов. - person fiddle; 08.03.2021

Предполагая, что вы используете Spring, ваш метод может вернуть Resource и пусть Spring обрабатывает все остальное (включая закрытие базового потока). Существует несколько реализаций Resource. доступны в Spring API, иначе вам нужно реализовать свой собственный. В конце концов, ваш метод станет простым и будет выглядеть примерно так:

public ResponseEntity<Resource> getFo0(...) {
    return new InputStreamResource(<Your input stream>);
}
person Stackee007    schedule 24.09.2018

Поскольку этот InputStream в основном происходит из простого файла, хорошей заменой является этот код:

FileSystemResource fsr = new FileSystemResource(fileName);
return ResponseEntity.status(HttpServletResponse.SC_OK).body(fsr);

FileSystemResource может принимать java.util.File, java.nio.file.Path или даже String, указывающие на соответствующий файл.

person Marged    schedule 22.09.2018
comment
Обрабатывает ли это закрытие ресурса и разные кодировки (например, iso lat 1)? - person NiharGht; 20.07.2021

Вы можете реорганизовать все методы вашего контроллера, которые читают локальные файлы и устанавливают их содержимое в качестве тела ответа HTTP:

Вместо использования подхода ResponseEntity вы вводите базовый HttpServletResponse и копируете байты входного потока, возвращенного из вашего метода getContent(...), в выходной поток HttpServletResponse, например. с помощью служебных методов, связанных с вводом-выводом, Apache CommonsIO или библиотеки Google Guava. В любом случае убедитесь, что вы закрыли входной поток! В приведенном ниже коде это делается неявно с помощью оператора try-with-resources, который закрывает объявленный входной поток в конце оператора.

@RequestMapping(value="/foo", method=RequestMethod.GET)
public void getFoo(HttpServletResponse response) {
    // use Java7+ try-with-resources
    try (InputStream content = getContent(...)) {

        // if needed set content type and attachment header
        response.addHeader("Content-disposition", "attachment;filename=foo.txt");
        response.setContentType("txt/plain");

        // copy content stream to the HttpServletResponse's output stream
        IOUtils.copy(myStream, response.getOutputStream());

        response.flushBuffer();
    }
}

ссылка:

https://docs.oracle.com/javase/7/docs/api/java/io/InputStream.html https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html https://google.github.io/guava/releases/19.0/api/docs/com/google/common/io/ByteStreams.html https://commons.apache.org/proper/commons-io/javadocs/api-release/index.html

(особенно обратите внимание на методы public static int copy(InputStream input, OutputStream output) throws IOException и public static int copyLarge(InputStream input, OutputStream output) throws IOException класса org.apache.commons.io.IOUtils)

person Tommy Brettschneider    schedule 24.09.2018
comment
Мне нравится подход @Marged (если у него нет предостережений, о которых я не знаю) немного больше за его краткость и, возможно, более последовательную работу с ReponseEntity во всех конечных точках. Тем не менее, мы использовали подход Томми в нескольких приложениях, и он работает безупречно. - person Jan B.; 27.09.2018
comment
FileCopyUtils Spring также можно использовать - он закрывает поток для вас, поэтому нужно помнить на одну вещь меньше, чем служебные методы, связанные с вводом-выводом, Apache CommonsIO или библиотеки Google Guava. - person Krzysztof Skrzynecki; 11.08.2019
comment
Правильно ли я понимаю, что главное преимущество StreamingResponseBody перед использованием HttpServletResponse#getOutputStream заключается в том, что вы можете запустить TaskExecutor, поэтому запись данных в поток не будет задерживать поток контейнера сервлета занятым? Я просто пытаюсь понять, в чем разница между этими двумя. - person Krzysztof Skrzynecki; 11.08.2019