Понимание предупреждения Jetty Closed while Pending/Unready

У нас есть асинхронный сервлет, который создает следующий журнал предупреждений от Jetty:

java.io.IOException: Closed while Pending/Unready

После включения журналов отладки я получил следующую трассировку стека:

WARN [jetty-25948] (HttpOutput.java:278)   - java.io.IOException: Closed while Pending/Unready
DEBUG [jetty-25948] (HttpOutput.java:279)   - 
java.io.IOException: Closed while Pending/Unready
        at org.eclipse.jetty.server.HttpOutput.close(HttpOutput.java:277) ~[jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.Response.closeOutput(Response.java:1044) [jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:488) [jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpChannel.run(HttpChannel.java:293) [jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:708) [jetty-util.jar:9.4.8.v20171121]
        at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:626) [jetty-util.jar:9.4.8.v20171121]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]

Это не слишком помогло.

Предупреждение появляется только после того, как Jetty вызывает метод onTimeout() нашего AsyncListener. QA иногда мог воспроизвести это, используя kill -9 в клиентском приложении.

Как я могу воспроизвести это предупреждение с помощью примера кода сервлет-клиент? Я хотел бы понять эту проблему в более простой среде, чем наш производственный код, чтобы впоследствии можно было исправить производственный код. Как должен вести себя образец сервлета? Можно ли воспроизвести это с помощью клиентской части Apache Commons HttpClient в том же тесте JUnit? (Это было бы здорово для написания интеграционного теста без сложного взлома сети, такого как kill -9.)

Я пробовал несколько вещей, чтобы реализовать образец асинхронного сервлета и клиента, но безуспешно. Я не думаю, что прикрепление этого кода слишком сильно поможет, но я могу это сделать, если кому-то интересно.

Версия причала: 9.4.8.v20171121

обновление (2018-06-27):

Размышляя над полезным ответом @Joakim Erdfelt, я не нашел ни одного вызова close() в нашем коде, но обнаружил подозрительную отсутствующую синхронизацию. Вот основа нашего сервлета асинхронного опроса:

public class QueuePollServlet extends HttpServlet {

    public QueuePollServlet() {
    }

    @Override
    protected void doPost(final HttpServletRequest req, final HttpServletResponse resp)
            throws ServletException, IOException {
        resp.setContentType(MediaType.OCTET_STREAM.type());
        resp.setStatus(HttpServletResponse.SC_OK);
        resp.flushBuffer();
        final AsyncContext async = req.startAsync();
        async.setTimeout(30_000);
        final ServletOutputStream output = resp.getOutputStream();
        final QueueWriteListener writeListener = new QueueWriteListener(async, output);
        async.addListener(writeListener);
        output.setWriteListener(writeListener);
    }

    private static class QueueWriteListener implements AsyncListener, WriteListener {

        private final AsyncContext asyncContext;
        private final ServletOutputStream output;

        public QueueWriteListener(final AsyncContext asyncContext, final ServletOutputStream output) {
            this.asyncContext = checkNotNull(asyncContext, "asyncContext cannot be null");
            this.output = checkNotNull(output, "output cannot be null");
        }

        @Override
        public void onWritePossible() throws IOException {
            writeImpl();
        }

        private synchronized void writeImpl() throws IOException {
            while (output.isReady()) {
                final byte[] message = getNextMessage();
                if (message == null) {
                    output.flush();
                    return;
                }
                output.write(message);
            }
        }

        private void completeImpl() {
            asyncContext.complete();
        }

        public void dataArrived() {
            try {
                writeImpl();
            } catch (IOException e) {
                ...
            }
        }

        public void noMoreBuffers() {
            completeImpl();
        }

        @Override
        public void onTimeout(final AsyncEvent event) throws IOException {
            completeImpl();
        }

        @Override
        public void onError(final Throwable t) {
            logger.error("Writer.onError", t);
            completeImpl();
        }


        ...
    }
}

Вероятное состояние гонки:

  1. DataFeederThread: вызывает dataArrived() -> writeImpl(), затем получает, что output.isReady() равно true.
  2. Jetty вызывает onTimeout(), что завершает контекст.
  3. DataFeederThread: вызывает output.write() в цикле while, но находит завершенный контекст.

Может ли этот сценарий вызвать предупреждение Closed while Pending/Unready или это другая проблема? Я прав, создание completeImpl() synchronized решает проблему или есть что-то еще, о чем нужно заботиться?

обновление (2018-06-28):

У нас также есть аналогичная реализация onError в QueueWriteListener в виде следующего фрагмента:

@Override
public void onError(final Throwable t) {
    logger.error("Writer.onError", t);
    completeImpl();
}

Во всяком случае, нет журнала ошибок onError вокруг сообщения журнала Closed while Pending/Unready (с учетом двухчасового периода времени для каждого), только EOF, подобные следующим из нашего DataFeederThread:

DEBUG [DataFeederThread] (HttpOutput.java:224) - 
org.eclipse.jetty.io.EofException: null
        at org.eclipse.jetty.server.HttpConnection$SendCallback.reset(HttpConnection.java:704) ~[jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpConnection$SendCallback.access$300(HttpConnection.java:668) ~[jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpConnection.send(HttpConnection.java:526) ~[jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpChannel.sendResponse(HttpChannel.java:778) ~[jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpChannel.write(HttpChannel.java:834) ~[jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:234) [jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:218) [jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpOutput.flush(HttpOutput.java:392) [jetty-server.jar:9.4.8.v20171121]
        at com.example.QueuePollServlet$QueueWriteListener.writeImpl()
        at com.example.QueuePollServlet$QueueWriteListener.dataArrived()


DEBUG [DataFeederThread] (QueuePollServlet.java:217) - messageArrived exception
org.eclipse.jetty.io.EofException: Closed
        at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:476) ~[jetty-server.jar:9.4.8.v20171121]
        at com.example.QueuePollServlet$QueueWriteListener.writeImpl()
        at com.example.QueuePollServlet$QueueWriteListener.dataArrived()

person palacsint    schedule 25.06.2018    source источник
comment
@JoakimErdfelt: у нас есть onError с ведением журнала, но он не вызывается (я обновил вопрос с этим).   -  person palacsint    schedule 28.06.2018
comment
Кроме того, это не связано с вашим вопросом, но вам действительно следует обновить свою версию Причал используется в производстве. Вы также получите другие исправления ошибок и даже улучшения в кодовой базе, связанные с вашей проблемой. (Подсказка: закрытие обрабатывается по-разному из-за обнаружений в высоконагруженных системах HTTP/2)   -  person Joakim Erdfelt    schedule 28.06.2018
comment
Спасибо, @JoakimErdfelt! Я обновил наш Jetty.   -  person palacsint    schedule 29.06.2018


Ответы (2)


Предупреждение Closed while Pending/Unready можно воспроизвести с помощью ручной отладки и простого клиента curl. Я тестировал его с Jetty 9.4.8.v20171121 и Jetty 9.4.11.v20180605. Вам нужны две точки останова, поэтому надежно воспроизвести в тестовой среде непросто.

  1. Первая точка останова находится в методе HttpOutput.write() сразу после того, как он меняет свое состояние с READY на PENDING:

    case READY:
        if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
            continue;
        // put a breakpoint here
    
  2. Второй в Response.closeOutput():

    case STREAM:
        // put a breakpiont here
        if (!_out.isClosed())
            getOutputStream().close();
        break;
    

Действия по воспроизведению:

  1. [JettyThread1] Вызывает QueueWriteListener.onWritePossible(), который записывает несколько байтов в вывод, а затем возвращается (поскольку его входной буфер пуст).
  2. Дождитесь события onTimeout.
  3. [JettyThread2] HttpChannelState.onTimeout() вызывает QueueWriteListener.onTimeout(), который вызывает asyncContext.complete().
  4. [JettyThread2] HttpChannelState.onTimeout() планирует отправку по истечении асинхронного тайм-аута.
  5. [JettyThread2] Пауза во второй точке останова
  6. [ДПФ] DataFeederThread вызывает writeImpl()
  7. [DFT] DataFeederThread вызывает HttpOutput.write() (это выходной поток)
  8. [DFT] HttpOutput.write() меняет свое состояние с READY на PENDING
  9. [DFT] DataFeederThread здесь останавливается из-за точки останова выше
  10. [JettyThread2] Запланированная отправка закрывает выходной поток и выдает предупреждение Closed while Pending/Unready.

Итак, на самом деле именно Jetty закрывает выходной поток в этом (Jetty 9.4.8.v20171121) стеке:

Thread [jetty-19] (Suspended)   
    Response.closeOutput() line: 1043   
    HttpChannelOverHttp(HttpChannel).handle() line: 488 
    HttpChannelOverHttp(HttpChannel).run() line: 293    
    QueuedThreadPool.runJob(Runnable) line: 708 
    QueuedThreadPool$2.run() line: 626  
    Thread.run() line: 748  

Создание onTimeout() synchronized (а также writeImpl() также synchronized) в слушателе не помогает, так как запланированное закрытие все еще может перекрываться с writeImpl (из DataFeederThread). Рассмотрим этот случай:

  1. [JettyThread1] Вызывает QueueWriteListener.onWritePossible(), который записывает несколько байтов в вывод, а затем возвращается (поскольку его входной буфер пуст).
  2. Дождитесь события onTimeout.
  3. [JettyThread2] HttpChannelState.onTimeout() вызывает QueueWriteListener.onTimeout(), который вызывает asyncContext.complete().
  4. [DFT] DataFeederThread вызывает writeImpl() (он заблокирован, так как onTimeout еще не закончил)
  5. [JettyThread2] QueueWriteListener.onTimeout() завершено
  6. [ДПФ] writeImpl() может работать
  7. [JettyThread2] HttpChannelState.onTimeout() планирует отправку по истечении асинхронного тайм-аута.
  8. [JettyThread2] Пауза во второй точке останова
  9. [DFT] DataFeederThread вызывает HttpOutput.write() (это выходной поток)
  10. [DFT] HttpOutput.write() меняет свое состояние с READY на PENDING
  11. [DFT] DataFeederThread здесь останавливается из-за точки останова выше
  12. [JettyThread2] Запланированная отправка закрывает выходной поток и выдает предупреждение Closed while Pending/Unready.

К сожалению, после asnyContext.complete() недостаточно проверить output.isReady(). Он возвращает true, так как Jetty повторно открывает HttpOutput (см. стек ниже), поэтому вам нужен отдельный флаг для этого в слушателе.

Thread [jetty-13] (Suspended (access of field _state in HttpOutput))    
    HttpOutput.reopen() line: 195   
    HttpOutput.recycle() line: 923  
    Response.recycle() line: 138    
    HttpChannelOverHttp(HttpChannel).recycle() line: 269    
    HttpChannelOverHttp.recycle() line: 83  
    HttpConnection.onCompleted() line: 424  
    HttpChannelOverHttp(HttpChannel).onCompleted() line: 695    
    HttpChannelOverHttp(HttpChannel).handle() line: 493 
    HttpChannelOverHttp(HttpChannel).run() line: 293    
    QueuedThreadPool.runJob(Runnable) line: 708 
    QueuedThreadPool$2.run() line: 626  
    Thread.run() line: 748  

Кроме того, isReady() также возвращает true, когда выход все еще закрыт (перед перезапуском/повторным открытием). Связанный вопрос: isReady() возвращает true в закрытом состоянии - почему?

Окончательная реализация выглядит примерно так:

@Override
protected void doPost(final HttpServletRequest req, final HttpServletResponse resp)
        throws ServletException, IOException {
    resp.setContentType(MediaType.OCTET_STREAM.type());
    resp.setStatus(HttpServletResponse.SC_OK);
    resp.setBufferSize(4096);
    resp.flushBuffer();
    final AsyncContext async = req.startAsync();
    async.setTimeout(5_000); // millis
    final ServletOutputStream output = resp.getOutputStream();
    final QueueWriteListener writeListener = new QueueWriteListener(async, output);
    async.addListener(writeListener);
    output.setWriteListener(writeListener);
}

private static class QueueWriteListener implements AsyncListener, WriteListener {

    private static final Logger logger = LoggerFactory.getLogger(QueueWriteListener.class);

    private final AsyncContext asyncContext;
    private final ServletOutputStream output;

    @GuardedBy("this")
    private boolean completed = false;

    public QueueWriteListener(final AsyncContext asyncContext, final ServletOutputStream output) {
        this.asyncContext = checkNotNull(asyncContext, "asyncContext cannot be null");
        this.output = checkNotNull(output, "output cannot be null");
    }

    @Override
    public void onWritePossible() throws IOException {
        writeImpl();
    }

    private synchronized void writeImpl() throws IOException {
        if (completed) {
            return;
        }
        while (output.isReady()) {
            final byte[] message = getNextMessage();
            if (message == null) {
                output.flush();
                return;
            }
            output.write(message);
        }
    }

    private synchronized void completeImpl() {
        // also stops DataFeederThread to call bufferArrived
        completed = true;
        asyncContext.complete();
    }

    @Override
    public void onError(final Throwable t) {
        logger.error("Writer.onError", t);
        completeImpl();
    }

    public void dataArrived() {
        try {
            writeImpl();
        } catch (RuntimeException | IOException e) {
            ...
        }
    }

    public void noMoreData() {
        completeImpl();
    }

    @Override
    public synchronized void onComplete(final AsyncEvent event) throws IOException {
        completed = true; // might not needed but does not hurt
    }

    @Override
    public synchronized void onTimeout(final AsyncEvent event) throws IOException {
        completeImpl();
    }

    @Override
    public void onError(final AsyncEvent event) throws IOException {
        logger.error("onError", event.getThrowable());
    }

    ...
}

обновление 2018-08-01: на самом деле оно не устранило предупреждение полностью, см.: "Закрыто в ожидании/ Предупреждения о неготовности от Jetty

person palacsint    schedule 29.06.2018
comment
ВАУ! Какая потрясающая деталь. Можете ли вы зарегистрировать это как проблему Jetty? github.com/eclipse/jetty.project/issues — возможно, мы сможем найти лучший способ. даже если это просто делает ошибки / проблемы более ясными, когда они происходят. - person Joakim Erdfelt; 29.06.2018
comment
Спасибо! Думаю, я мог бы создать эту проблему, но, вероятно, только в понедельник. (Не стесняйтесь создать его, если у вас есть время.) - person palacsint; 29.06.2018

Вы используете асинхронный ввод-вывод из спецификации сервлета и вручную закрыли поток ответов.

Ключевой факт: вызов close подразумевает операцию записи.

В случае Jetty IOException сообщает вам, что ваш ручной вызов закрытия потока ответа вызвал нежелательный побочный эффект.

Таким образом, в режиме асинхронного ввода-вывода вызов ServletOutputStream.isReady() должно было быть выполнено до использования ServletOutputStream.close() для проверки истинности isReady().

Обратите внимание, что может быть желательно разрешить ServletOutputStream.close() в любое время, особенно если AsyncContext.complete().

Однако, если предыдущая запись еще не завершена и/или ServletOutputStream.isReady() не был вызван, этот конкретный случай голого ServletOutputStream.close() разрешен, но поведение состоит в том, чтобы прервать ответ, отбрасывая все ожидающие/незаписанные записи в ServletOutputStream.

Вот почему вы получаете IOException("Closed while Pending/Unready")

Теперь вы должны спросить себя, почему вы закрываете поток ответов? Это не требуется в соответствии со спецификацией сервлета и нежелательно, если вы когда-либо хотели использовать RequestDispatcher для объединения нескольких ответов сервлетов вместе.

person Joakim Erdfelt    schedule 27.06.2018
comment
Спасибо за ответ! Думаю, я что-то нашел, не могли бы вы проверить обновление в вопросе, пожалуйста? - person palacsint; 27.06.2018
comment
Спасибо за полезную подсказку, что кто-то закрывает поток. Оказалось, что его закрывает Джетти. Я смог отладить, воспроизвести и исправить ошибку. (Подробности в моем ответе ниже.) - person palacsint; 29.06.2018