Chronicle Queue замедляется и заканчивается память

Я оцениваю Chronicle Queue для использования в нашем программном обеспечении и, должно быть, делаю что-то не так.
У меня есть приложение, которое очень быстро записывает около 650 тысяч записей. После этого он останавливается, и в этот момент память увеличивается до максимально допустимого уровня и в конечном итоге достигает OutOfMemory.

Вот мой код:

final class LogEntryOutput implements WriteBytesMarshallable
{
  private final int maxMessageSize;
  private TLogEntry logEntry;

  LogEntryOutput(final int maxMessageSize)
  {
    this.maxMessageSize = maxMessageSize;
  }

  public void setMarshallable(final TLogEntry logEntry)
  {
    this.logEntry = logEntry;
  }

  @Override
  @SuppressWarnings({"rawtypes", "No way to provide generic type and override WriteBytesMarshallable."})
  public void writeMarshallable(final BytesOut bytes)
  {
    bytes.writeLong(this.logEntry.getSessionId());
    bytes.writeInt(this.logEntry.getLogLevel());
    bytes.writeInt(this.logEntry.getSecurityLevel());
    bytes.writeLong(this.logEntry.getPosixTimestamp());

    // Limit size of string messages.
    final int messageSize = Math.min(this.logEntry.getMessage().length(), this.maxMessageSize);

    // Write message length
    bytes.writeStopBit((long)messageSize);

    // Write message bytes.
    bytes.write(this.logEntry.getMessage(), 0, messageSize);
  }
}    

final TLogEntry entry = new TLogEntry();
entry.setSessionId(321234L);
entry.setLogLevel(77);
entry.setSecurityLevel(1234);
entry.setPosixTimestamp(6141234321L);
entry.setMessage("This is a test message for the system................................ A");

final LogEntryOutput output = new LogEntryOutput(1024);
output.setMarshallable(entry);

final ChronicleQueue queue = SingleChronicleQueueBuilder.binary(config.getQueueDirectory())
  .rollCycle(RollCycles.HOURLY)
  .build();
final ExcerptAppender appender = queue.acquireAppender();

for (int j = 0; j < 100; ++j)
{
  for (int i = 0; i < 10000; ++i)
  {
    appender.writeBytes(output);
  }

  System.out.println((j+1) * 10000);
  Jvm.pause(100L);
}

queue.close();

Это работает в Windows 7 x64 с 64-битной JVM с использованием: -Xmx1024m
Есть идеи, что я могу делать неправильно?

РЕДАКТИРОВАТЬ: у меня есть дополнительная информация. Я сделал снимок распределения объектов сразу после всплеска памяти. Много массивов объектов и тому подобного. введите здесь описание изображения введите описание изображения здесь И трассировка стека, когда я получаю ошибку OOM.

java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.concurrent.ConcurrentLinkedQueue.offer(ConcurrentLinkedQueue.java:328)
    at java.util.concurrent.ConcurrentLinkedQueue.add(ConcurrentLinkedQueue.java:297)
    at net.openhft.chronicle.core.ReferenceCounter.recordRelease(ReferenceCounter.java:88)
    at net.openhft.chronicle.core.ReferenceCounter.release(ReferenceCounter.java:79)
    at net.openhft.chronicle.bytes.NativeBytesStore.release(NativeBytesStore.java:267)
    at net.openhft.chronicle.bytes.MappedBytes.acquireNextByteStore(MappedBytes.java:186)
    at net.openhft.chronicle.bytes.MappedBytes.peekVolatileInt(MappedBytes.java:388)
    at net.openhft.chronicle.wire.AbstractWire.readMetaDataHeader(AbstractWire.java:222)
    at net.openhft.chronicle.queue.impl.single.SCQIndexing.arrayForAddress(SCQIndexing.java:190)
    at net.openhft.chronicle.queue.impl.single.SCQIndexing.sequenceForPosition(SCQIndexing.java:492)
    at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueStore.sequenceForPosition(SingleChronicleQueueStore.java:272)
    at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts$StoreAppender.checkWritePositionHeaderNumber(SingleChronicleQueueExcerpts.java:339)
    at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts$StoreAppender.writingDocument(SingleChronicleQueueExcerpts.java:267)
    at net.openhft.chronicle.wire.MarshallableOut.writingDocument(MarshallableOut.java:55)
    at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts$StoreAppender.writeBytes(SingleChronicleQueueExcerpts.java:117)
    at com.selinc.winchester.ledger.writer.harness.queue.LogEntryConsumers$LogEntryChronicle.accept(LogEntryConsumers.java:78)
    at com.selinc.winchester.ledger.writer.harness.queue.LogEntryConsumers$LogEntryChronicle.accept(LogEntryConsumers.java:45)
    at com.selinc.winchester.ledger.writer.harness.queue.LogEntryConsumersTest.test(LogEntryConsumersTest.java:56)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:86)
    at org.testng.internal.Invoker.invokeMethod(Invoker.java:643)
    at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:820)
    at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1128)
    at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:129)
    at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:112)
    at org.testng.TestRunner.privateRun(TestRunner.java:782)
    at org.testng.TestRunner.run(TestRunner.java:632)
    at org.testng.SuiteRunner.runTest(SuiteRunner.java:366)
    at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:361)

person akagixxer    schedule 18.05.2017    source источник
comment
Сколько у вас оперативной памяти? Каковы точные сообщения OOME, поскольку я не думаю, что у вас заканчивается куча.   -  person Peter Lawrey    schedule 19.05.2017
comment
У меня сейчас нет с собой машины. Я опубликую трассировку стека, когда смогу. Но у машины есть, по-моему, 8Гб. Я бы не ожидал, что буду использовать так много памяти, когда я даже не генерирую столько данных. Примерно до 650 тыс. записей (для меня) он работает с кучей менее 50 МБ, но затем VisualVM говорит, что куча увеличивается и заканчивается.   -  person akagixxer    schedule 19.05.2017
comment
Я в растерянности... Все работает нормально, если я выполняю этот код из командной строки или как приложение в Intellij (без всплеска памяти, и все 1 миллион записей занимают ‹ 500 мс). НО, если я запускаю его как модульный тест в среде TestNG, я вижу это поведение.   -  person akagixxer    schedule 19.05.2017


Ответы (1)


В Chronicle Queue есть несколько дополнительных проверок для обнаружения утечек памяти, которые включаются с помощью -ea. Если вы запускаете с этими дополнительными проверками, в вашем случае очередь замедляет около 90 000 сообщений. Если вы отключите утверждения, вы заставите его работать намного дольше.

Это достигает 10 000 000 записей за 5,5 секунд на ноутбуке Windows с 8 ГБ памяти и с отключенными утверждениями.

Он также делает 100 миллионов записей за 66 секунд.

public class ATest {
    static class TLogEntry {

        private long sessionId;
        private int logLevel;
        private int securityLevel;
        private long posixTimestamp;
        private CharSequence message;

        public long getSessionId() {
            return sessionId;
        }

        public void setSessionId(long sessionId) {
            this.sessionId = sessionId;
        }

        public int getLogLevel() {
            return logLevel;
        }

        public void setLogLevel(int logLevel) {
            this.logLevel = logLevel;
        }

        public int getSecurityLevel() {
            return securityLevel;
        }

        public void setSecurityLevel(int securityLevel) {
            this.securityLevel = securityLevel;
        }

        public long getPosixTimestamp() {
            return posixTimestamp;
        }

        public void setPosixTimestamp(long posixTimestamp) {
            this.posixTimestamp = posixTimestamp;
        }

        public CharSequence getMessage() {
            return message;
        }

        public void setMessage(CharSequence message) {
            this.message = message;
        }
    }

    static class LogEntryOutput implements WriteBytesMarshallable {
        private final int maxMessageSize;
        private TLogEntry logEntry;

        LogEntryOutput(final int maxMessageSize) {
            this.maxMessageSize = maxMessageSize;
        }

        public void setMarshallable(final TLogEntry logEntry) {
            this.logEntry = logEntry;
        }

        @Override
        @SuppressWarnings({"rawtypes", "No way to provide generic type and override WriteBytesMarshallable."})
        public void writeMarshallable(final BytesOut bytes) {
            bytes.writeLong(this.logEntry.getSessionId());
            bytes.writeInt(this.logEntry.getLogLevel());
            bytes.writeInt(this.logEntry.getSecurityLevel());
            bytes.writeLong(this.logEntry.getPosixTimestamp());

            // Limit size of string messages.
            final int messageSize = Math.min(this.logEntry.getMessage().length(), this.maxMessageSize);

            // Write message length
            bytes.writeStopBit((long) messageSize);

            // Write message bytes.
            bytes.write(this.logEntry.getMessage(), 0, messageSize);
        }
    }

    @Test
    public void test() {
        final TLogEntry entry = new TLogEntry();
        entry.setSessionId(321234L);
        entry.setLogLevel(77);
        entry.setSecurityLevel(1234);
        entry.setPosixTimestamp(6141234321L);
        entry.setMessage("This is a test message for the system................................ A");

        final LogEntryOutput output = new LogEntryOutput(1024);
        output.setMarshallable(entry);

        final ChronicleQueue queue = SingleChronicleQueueBuilder.binary(
                OS.TARGET + "/test-" + System.nanoTime())
                .rollCycle(RollCycles.HOURLY)
                .build();
        final ExcerptAppender appender = queue.acquireAppender();
        Jvm.setExceptionHandlers(Slf4jExceptionHandler.FATAL, Slf4jExceptionHandler.WARN, Slf4jExceptionHandler.WARN);
        for (int j = 0; j < 1000; ++j) {
            for (int i = 0; i < 10000; ++i) {
                appender.writeBytes(output);
            }

            System.out.println((j + 1) * 10000);
            // Jvm.pause(100L);
        }

        queue.close();
    }
}
person Peter Lawrey    schedule 19.05.2017
comment
Я понимаю. Спасибо за быстрый ответ и за проведение теста, чтобы помочь! Мы включили -ea по умолчанию при запуске модульных тестов. Я удалил -ea и выполнил без утверждений, и это решило проблему! Какой бы механизм ни отслеживал утечки памяти, похоже, он потребляет много памяти. - person akagixxer; 19.05.2017
comment
@akagixxer Chronicle Core 1.8.1-SNAPSHOT был исправлен, поэтому этот тест может создавать сообщения размером 100 M за 15 секунд с включенными утверждениями. - person Peter Lawrey; 20.05.2017
comment
Спасибо, но я не думаю, что у меня есть доступ к этому выпуску. Я использовал последнюю сборку от maven. - person akagixxer; 20.05.2017