Chronicle Queue Чтение/запись из очереди с использованием нулевого распределения

Я новый пользователь очереди хроник и хочу использовать стратегию нулевого распределения для чтения и записи объектов из очереди хроник. Я хочу использовать очередь и маршалируемую реализацию байтов, например класс pojo, является ли это правильной стратегией? Я не нашел ничего, что могло бы мне помочь. Я пытаюсь сделать экземпляр класса сообщений ниже для добавления и чтения содержимого из очереди. Когда я пытаюсь читать с более чем одним потоком, я всегда получаю ошибку памяти.

public class Message implements BytesMarshallable{
    private byte[] text;
    private long timeStamp;
    
    public Message(){}

    //Getters and Setters
}

Я пытался читать с помощью tailer.readDocument, когда я получил ошибку памяти с более чем одним потоком


person Domenico Schettini Filho    schedule 24.01.2021    source источник


Ответы (1)


Использование byte[] требует выделения нового каждый раз при изменении размера. Вы не можете изменить размер byte[].

Вместо этого я предлагаю использовать класс Bytes, размер которого можно изменять, не создавая большого количества мусора (только иногда, когда он растет).

package run.chronicle.queue;

import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.wire.BytesInBinaryMarshallable;
import net.openhft.chronicle.wire.LongConversion;
import net.openhft.chronicle.wire.MilliTimestampLongConverter;

public class Message extends BytesInBinaryMarshallable {
    private final Bytes text = Bytes.allocateElasticOnHeap();

    @LongConversion(MilliTimestampLongConverter.class)
    private long timeStamp;

    //Getters and Setters


    public Bytes getText() {
        return text;
    }

    public void setText(CharSequence text) {
        this.text.clear().append(text);
    }

    public long getTimeStamp() {
        return timeStamp;
    }

    public void setTimeStamp(long timeStamp) {
        this.timeStamp = timeStamp;
    }
}

См. этот пример https://github.com/OpenHFT/Chronicle-Queue-Demo/tree/master/messages-with-text

При запуске с небольшой кучей -Xmx128m -XX:NewSize=96m -verbose:gc MessageMain вы можете видеть, что миллионы сообщений не вызывают никаких сборов.

Read 10,000,000 of 10,000,000 messages in 7.990 seconds
Read 10,000,000 of 10,000,000 messages in 6.907 seconds
[main] INFO net.openhft.chronicle.bytes.MappedFile - Took 2 ms to add mapping for test-438402668456/metadata.cq4t
Read 10,000,000 of 10,000,000 messages in 6.836 seconds
[main] INFO net.openhft.chronicle.bytes.MappedFile - Took 2 ms to add mapping for test-445239126125/metadata.cq4t
Read 10,000,000 of 10,000,000 messages in 6.883 seconds
[main] INFO net.openhft.chronicle.bytes.MappedFile - Took 3 ms to add mapping for test-452122895277/metadata.cq4t
Read 10,000,000 of 10,000,000 messages in 7.013 seconds
Read 10,000,000 of 10,000,000 messages in 6.838 seconds
[main] INFO net.openhft.chronicle.bytes.MappedFile - Took 2 ms to add mapping for test-465974753213/metadata.cq4t
person Peter Lawrey    schedule 25.01.2021
comment
Привет, Петр, спасибо за помощь. Наилучшая производительность достигается в этом примере с использованием байтов, я имею в виду, что хочу избежать GC Runs, как вы упомянули, и у меня есть максимальный размер буфера. С этим я могу, например, получить лучшую производительность для чтения и записи в очереди хроники, используя BytesInBinaryMarshallable и двоичную очередь? - person Domenico Schettini Filho; 25.01.2021
comment
@DomenicoSchettiniFilho Размер Bytes будет изменяться по мере необходимости, но вы можете указать определенный размер для его настройки. Я не уверен, что вы вопрос, вы можете перефразировать его? - person Peter Lawrey; 25.01.2021
comment
Обязательно Питер. Я пытаюсь получить наилучшую производительность для записи и чтения из очереди хроники с нулевым gc, если это возможно, или оставаться на минимуме. Мой класс, как я описываю, имеет несколько текстовых полей и примитивов, таких как метка времени. Поскольку у меня есть максимальный размер буфера, я могу настроить gc без изменения размера байтов. Кроме того, по вашему мнению, производительность чтения/записи из очереди с этим решением хорошая? - person Domenico Schettini Filho; 25.01.2021
comment
@DomenicoSchettiniFilho Вы можете видеть, что накладные расходы в этом случае составляют около 0,3 микросекунды, есть несколько более быстрые варианты, но с ними сложнее работать, поэтому я бы начал с этого. - person Peter Lawrey; 25.01.2021
comment
Хороший Питер. Если у вас есть ссылка на более быстрые варианты, я тоже попробую. В любом случае я бы проверил больше вариантов, даже если это сложнее. Спасибо вам за помощь. - person Domenico Schettini Filho; 25.01.2021
comment
@DomenicoSchettiniFilho Самые большие накладные расходы - это ваш текст, какова длина строки, если вы можете закодировать ее, скажем, с помощью Base85 (см. Пример кодировщика выше), вы можете объединить 10-символьную строку в длинную и еще больше сократить задержку. - person Peter Lawrey; 26.01.2021
comment
Привет @PeterLawrey. Большое спасибо за совет, возможно, использование этого поля в двоичном виде тоже поможет. Помимо этой стратегии, я мог бы, например, получить значения непосредственно из памяти, может ли это как-то улучшить задержку? Как вы упомянули, большие накладные расходы в этом решении - это текст. Буду работать в альтернативе. - person Domenico Schettini Filho; 26.01.2021
comment
@DomenicoSchettiniFilho Вы можете посмотреть Chronicle Values, который поддерживает доступ к полям фиксированной длины без необходимости чтения всего объекта. - person Peter Lawrey; 26.01.2021
comment
Привет @PeterLawrey, спасибо за совет. Я проверяю значения хроники и, безусловно, это хороший вариант для ограничения размера полей. У меня есть еще один вопрос о записи/чтении в очереди хроники, способе иметь несколько слушателей и авторов в разных потоках, таких как события? Я хочу иметь несколько читателей, которые читают только в том случае, если в очереди выполняется новая запись, но читают в разных потоках. Это возможно? - person Domenico Schettini Filho; 27.01.2021
comment
@DomenicoSchettiniFilho, у вас может быть любое количество читателей/писателей для одной и той же очереди. Чтобы читатель мог проверить, есть ли что читать, ему необходимо опросить очередь. - person Peter Lawrey; 28.01.2021
comment
Чтобы использовать несколько потоков, лучше всего использовать такие события, как methodReader и methodWriter? И поскольку мы видим, что наихудшие накладные расходы, используемые в этом классе, — это текстовое поле, вы упоминали об использовании Base85 ранее, но, например, если у меня есть длинный текст (к сожалению, не в двоичной форме), используя базу 85 или даже 64, это улучшит чтение и время записи, но в случае сортировки и десортировки полей. попробую сравнить. Спасибо Петру за помощь. - person Domenico Schettini Filho; 29.01.2021
comment
@DomenicoSchettiniFilho Bytes — лучший вариант для более длинных строк. - person Peter Lawrey; 29.01.2021
comment
Хорошо, я использую Bytes @PeterLawrey. Я реорганизую класс для этого: public class Message extends BytesInBinaryMarshallable { private final Bytes<ByteBuffer> text; private long timeStamp; //Getters and Setters } Я пока не использую methodReader или methodWriter для записи/чтения из очереди. На самом деле я использую readDocument и writeDocument. Таким образом, это нормально или есть лучший вариант? - person Domenico Schettini Filho; 29.01.2021
comment
Привет, у @PeterLawrey есть способ чтения/записи с большей производительностью, потому что в моих тестах, как мы говорили, узким местом для меня является текстовое поле, но я ищу другие варианты чтения и записи. Потому что сейчас я использую следующие методы для чтения/записи: readDocument и writeDocument. Я пытаюсь распараллелить создание нескольких читателей, используя локальный поток для ExcerptTailer с разными именами. При таком подходе я перекрываю индексы tailer. Я не нашел способа параллельного чтения tailer разных индексов. На ваш взгляд, это может быть хорошим улучшением? - person Domenico Schettini Filho; 30.01.2021
comment
@DomenicoSchettiniFilho, чтобы сделать хвостовиков параллельными, вам нужно несколько потоков. У вас может быть любое количество процессов/потоков, одновременно читающих очередь без накладных расходов. - person Peter Lawrey; 31.01.2021
comment
В порядке. @PeterLawrey У меня плохое время чтения и записи из очереди. На самом деле у меня есть ограничение на текстовое поле. у него есть луч для чтения и записи с большей производительностью, возможно, с использованием необработанных данных или чтения непосредственно из памяти. Теперь я использую BytesInBinaryMarshallable для записи и чтения объекта. - person Domenico Schettini Filho; 31.01.2021
comment
@DomenicoSchettiniFilho Мы поддерживаем List<String> и String[] проблему, с которой вы столкнулись. - person Peter Lawrey; 01.02.2021
comment
Привет @PeterLawrey. На самом деле я использую три поля байтов, и я получаю в самых высоких процентилях более высокое число от 10 до 25 нас на запись в очередь и чтение из очереди. Я объявляю эти поля с начальным размером, как я указал в примере, но даже с таким подходом я не смог улучшить производительность. - person Domenico Schettini Filho; 01.02.2021
comment
Я думаю, что, возможно, исправление максимального размера может немного улучшить производительность, потому что в начале я поставил 2048 в качестве размера байтов. - person Domenico Schettini Filho; 03.02.2021
comment
@DomenicoSchettiniFilho Установка размера может немного помочь, однако, как только он вырастет до необходимого размера, он больше не будет расти. - person Peter Lawrey; 05.02.2021