Медленный сборщик очереди в многопоточной очереди добавления

У меня есть сценарий, когда несколько потоков записываются в одну и ту же очередь.

Потоки Appender получают обновления с разных рынков (каждый поток — отдельный рынок) и помещают эти данные в одну и ту же очередь:

ChronicleQueue queue = SingleChronicleQueueBuilder.binary(path + "/market").build();
        final ExcerptTailer tailer = queue.createTailer();
appender.writeDocument(
                        wire -> {

                                wire
                                        .getValueOut().text("buy")
                                        .getValueOut().text(exchange.name())
                                        .getValueOut().text(currencyPair.toString())
                                        .getValueOut().dateTime(LocalDateTime.now(Clock.systemUTC()))
                                        .getValueOut().text(price);
                            });

Затем у меня есть совершенно отдельный процесс (другая JVM) для непрерывного чтения из очереди, выполнив:

while (true){
     tailer.readDocument(........

Но пока я генерирую около 10 обновлений в очередь в секунду, хвостовик обрабатывает примерно одну запись за 3 секунды. Я думаю, что мне не хватает чего-то фундаментального здесь :-)

Или как правильно постоянно прослушивать обновления в очереди? Я не смог найти другого решения, кроме как while (true), затем сделать...

Я разрабатываю на 18-ядерной машине (36 потоков) и использую Java Affinity для назначения каждой работы собственному процессору.

Спасибо за любые подсказки.


person kensai    schedule 09.01.2018    source источник


Ответы (1)


Создание очереди очень затратно, попробуйте сделать это только один раз для каждого процесса, если можете.

Создание Tailer также дорого стоит, вы должны создать его один раз и продолжать опрашивать об обновлениях.

Создание объектов может быть дорогим, я бы не стал создавать какие-либо объекты. например не звоните toString или LocalDate.now

Вот пример бенчмаркинга

String path = OS.getTarget();
ChronicleQueue queue = SingleChronicleQueueBuilder.binary(path + "/market").build();
ExcerptAppender appender = queue.acquireAppender();
Exchange exchange = Exchange.EBS;
CurrencyPair currencyPair = CurrencyPair.EURUSD;
double price = 1.2345;
for (int t = 0; t < 5; t++) {
    long start = System.nanoTime();
    int messages = 100000;
    for (int i = 0; i < messages; i++) {
        try (DocumentContext dc = appender.writingDocument()) {
            ValueOut valueOut = dc.wire().getValueOut();
            valueOut.text("buy")
                    .getValueOut().asEnum(exchange)
                    .getValueOut().asEnum(currencyPair)
                    .getValueOut().int64(System.currentTimeMillis())
                    .getValueOut().float64(price);
        }
    }
    long time = System.nanoTime() - start;
    System.out.printf("Throughput was %,d messages per second%n", (long) (messages * 1e9 / time));
    Jvm.pause(100);
}

отпечатки

Throughput was 962,942 messages per second
Throughput was 2,952,433 messages per second
Throughput was 4,776,337 messages per second
Throughput was 3,250,235 messages per second
Throughput was 3,514,863 messages per second

А для чтения можно сделать

final ExcerptTailer tailer = queue.createTailer();
for (int t = 0; t < 5; t++) {
    long start = System.nanoTime();
    int messages = 100000;
    for (int i = 0; i < messages; i++) {
        try (DocumentContext dc = tailer.readingDocument()) {
            if (!dc.isPresent())
                throw new AssertionError("Missing t: " + t + ", i: " + i);
            ValueIn in = dc.wire().getValueIn();
            String buy = in.text();
            Exchange exchange2 = in.asEnum(Exchange.class);
            CurrencyPair currencyPair2 = in.asEnum(CurrencyPair.class);
            long time = in.int64();
            double price2 = in.float64();
        }
    }
    long time = System.nanoTime() - start;
    System.out.printf("Read Throughput was %,d messages per second%n", (long) (messages * 1e9 / time));
}

примечание: он читает столько же сообщений, сколько было написано.

отпечатки

Read Throughput was 477,849 messages per second
Read Throughput was 3,083,642 messages per second
Read Throughput was 5,100,516 messages per second
Read Throughput was 6,342,525 messages per second
Read Throughput was 6,672,971 messages per second
person Peter Lawrey    schedule 09.01.2018
comment
Привет, Питер, прежде всего спасибо за твой замечательный материал. Спасибо за кусок кода. Проблема, которую я вижу, в хвосте. Короче говоря, я не вижу в хвосте все сообщения, которые я помещаю в очередь через потоки приложения (или я нахожусь под впечатлением в данный момент), мне кажется, что один раз за раз я получаю последнюю доступную, возможно ли это или даже стандартное поведение (своего рода конфигурация хвостореза)? Я использую 4.6.61 версию очереди. У меня просто гораздо больше добавлений, чем более поздние хвостовые события в цифрах, но мне нужно полностью проверить дамп очереди, но количество событий RX/TX совершенно другое. - person kensai; 09.01.2018
comment
В любом случае, когда я смотрю на openhft, вы смертоносный воин высокой производительности :-), еще раз спасибо за то, что такой замечательный проект стал общедоступным. - person kensai; 09.01.2018
comment
@kensai Я добавил пример чтения, который читает все сообщения со скоростью до 6,6 м/с. Не видя вашего кода, я не могу понять, что не так. - person Peter Lawrey; 09.01.2018
comment
Великолепно, мой код действительно использует просто чистое время (true) {tailer.readDocument} в бесконечном цикле, как это было предложено в исходном утверждении. Я вообще не использовал объект DocumentContext. Я собираюсь попробовать. На самом деле я использовал следующий пример кода: github.com/OpenHFT/Chronicle-Queue-Sample/blob/master/ - person kensai; 10.01.2018
comment
вы считаете, что что-то вроде @Producer(имя очереди, местоположение или конфигурация) @Consumer(имя очереди, местоположение или конфигурация) или что-то подобное должно быть реализовано в основе, поэтому я могу работать с очередью хроники, как с асинхронной брокерской системой. - person kensai; 17.01.2018
comment
@kensai мы выступаем за поддержку асинхронных API как более естественного способа написания кода на Java. vanilla-java. github.io/2016/03/24/ - person Peter Lawrey; 17.01.2018