Очистка BufferedWriter на основе тайм-аута

Я использую BufferedWriter с размером по умолчанию 8192 символов для записи строк в локальный файл. Строки считываются из входного потока сокета с помощью метода BufferedReader readLine, блокируя ввод-вывод.

Средняя длина строки составляет 50 символов. Все это работает хорошо и достаточно быстро (более 1 млн строк в секунду), однако, если клиент перестанет писать, строки, которые сейчас хранятся в буфере BufferedWriter, не будут сброшены на диск. Фактически буферизованные символы не будут сброшены на диск до тех пор, пока клиент не возобновит запись или соединение не будет закрыто. Это приводит к задержке между передачей строки времени клиентом и временем фиксации этой строки в файле, поэтому задержка с длинным хвостом увеличивается.

Есть ли способ сбросить неполный буфер BufferedWriter по тайм-ауту, например. в течение 100 миллисекунд?


person Sergei Rodionov    schedule 01.06.2015    source источник
comment
Можете ли вы просто явно сбросить BufferedWriter после каждой написанной строки?   -  person copeg    schedule 02.06.2015
comment
Я тестировал построчную промывку. Учитывая, что каждая строка занимает всего 50 байт (маленькая полезная нагрузка на сброс), пропускная способность падает вдвое. Что еще хуже, он создает дисковый ввод-вывод и, таким образом, замедляет другие потоки, записывающие файлы.   -  person Sergei Rodionov    schedule 02.06.2015
comment
Достаточно честно, и не для того, чтобы бить дохлую лошадь — но ощутим ли этот эффект с точки зрения пользователя? Цитируя Дональда Кнута: Программисты тратят огромное количество времени, думая или беспокоясь о скорости некритических частей своих программ... преждевременная оптимизация - корень всех зол   -  person copeg    schedule 02.06.2015


Ответы (4)


Как насчет чего-то подобного? Это не настоящий BufferedWriter, а Writer. Он работает, периодически проверяя последний модуль записи на базовом, надеюсь, небуферизованном модуле записи, а затем сбрасывая BufferedWriter, если время ожидания превышает время ожидания.

public class PeriodicFlushingBufferedWriter extends Writer {

  protected final MonitoredWriter monitoredWriter;
  protected final BufferedWriter writer;

  protected final long timeout;
  protected final Thread thread;

  public PeriodicFlushingBufferedWriter(Writer out, long timeout) {
    this(out, 8192, timeout);
  }

  public PeriodicFlushingBufferedWriter(Writer out, int sz, final long timeout) {
    monitoredWriter = new MonitoredWriter(out);
    writer = new BufferedWriter(monitoredWriter, sz);

    this.timeout = timeout;

    thread = new Thread(new Runnable() {
      @Override
      public void run() {
        long deadline = System.currentTimeMillis() + timeout;
        while (!Thread.interrupted()) {
          try {
            Thread.sleep(Math.max(deadline - System.currentTimeMillis(), 0));
          } catch (InterruptedException e) {
            return;
          }

          synchronized (PeriodicFlushingBufferedWriter.this) {
            if (Thread.interrupted()) {
              return;
            }

            long lastWrite = monitoredWriter.getLastWrite();

            if (System.currentTimeMillis() - lastWrite >= timeout) {
              try {
                writer.flush();
              } catch (IOException e) {
              }
            }

            deadline = lastWrite + timeout;
          }
        }
      }
    });

    thread.start();
  }

  @Override
  public synchronized void write(char[] cbuf, int off, int len) throws IOException {
    this.writer.write(cbuf, off, len);
  }

  @Override
  public synchronized void flush() throws IOException {
    this.writer.flush();
  }

  @Override
  public synchronized void close() throws IOException {
    try {
      thread.interrupt();
    } finally {
      this.writer.close();
    }
  }

  private static class MonitoredWriter extends FilterWriter {

    protected final AtomicLong lastWrite = new AtomicLong();

    protected MonitoredWriter(Writer out) {
      super(out);
    }

    @Override
    public void write(int c) throws IOException {
      lastWrite.set(System.currentTimeMillis());
      super.write(c);
    }

    @Override
    public void write(char[] cbuf, int off, int len) throws IOException {
      lastWrite.set(System.currentTimeMillis());
      super.write(cbuf, off, len);
    }

    @Override
    public void write(String str, int off, int len) throws IOException {
      lastWrite.set(System.currentTimeMillis());
      super.write(str, off, len);
    }

    @Override
    public void flush() throws IOException {
      lastWrite.set(System.currentTimeMillis());
      super.flush();
    }

    public long getLastWrite() {
      return this.lastWrite.get();
    }
  }
}
person David Ehrmann    schedule 01.06.2015
comment
Я собираюсь попробовать, спасибо. Другой вариант - хранить входящие строки в моем собственном буфере и сбрасывать их по тайм-ауту с другим потоком watcher. - person Sergei Rodionov; 02.06.2015

@copeg прав - сбрасывать его после каждой строки. Его легко сбросить во времени, но какой смысл иметь только половину записи и не иметь возможности ее продолжить?

person Alex    schedule 01.06.2015

Здесь вы можете применить шаблоны Observer, Manager и Factory, а центральный BufferedWriterManager будет создавать ваши BufferedWriters и поддерживать список активных экземпляров. Внутренний поток может периодически просыпаться и очищать активные экземпляры. Это также может быть возможностью для слабых ссылок, поэтому вашим потребителям не требуется явно освобождать объект. Вместо этого всю работу выполнит сборщик мусора, а вашему менеджеру просто нужно обработать случай, когда его внутренняя ссылка станет нулевой (т. е. когда все сильные ссылки будут удалены).

person jatal    schedule 01.06.2015
comment
Является ли flush() потокобезопасным? Потому что вполне возможно, что BufferedWriter вызовет подразумеваемую очистку и в то же время будет очищен другим потоком? Ни BufferedWriter, ни javadocs FileWriter не упоминают об этом, и поэтому я склонен предполагать, что это не потокобезопасно. - person Sergei Rodionov; 02.06.2015

Не пытайтесь использовать эту сложную схему, это слишком сложно. Просто уменьшите размер буфера, указав его при построении BufferedWriter. Уменьшите его, пока не найдете баланс между производительностью и задержкой, который вам нужен.

person user207421    schedule 01.06.2015
comment
У меня есть добровольный контракт, который нужно выполнить, сообщение, полученное сервером, должно быть сохранено в файле в течение 100 миллисекунд. Если я уменьшу размер буфера, я не смогу этого сделать, потому что неполный буфер не будет очищен, если клиент перестанет писать новые строки. - person Sergei Rodionov; 02.06.2015