Очередь хроники: StoreFileListener несколько onAcquired и onReleased

Я использую Хронику 4.5.27.

Ниже представлена ​​простая реализация Writer и Reader с помощью StoreFileListner. В Reader я получаю несколько событий onAcquired и onReleased.

Почему это произойдет? Я ожидаю получить только одно получение (когда файл получен для чтения) и одно освобождение (после завершения чтения).

В приведенных ниже журналах для Reader вы можете увидеть несколько событий onAcquired и onReleased.

Обратите внимание, что это поведение является случайным. Также обратите внимание, что Writer был намеренно замедлен с помощью Jvm.pause, чтобы имитировать реальную систему, в которой данные могут быть недоступны постоянно.

import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.StoreFileListener;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;

import java.io.File;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class ChronicleFactory {
    public SingleChronicleQueue createChronicle(String instance, String persistenceDir, RollCycles rollCycles) {
        SingleChronicleQueue chronicle = null;
        try {
            chronicle = SingleChronicleQueueBuilder.binary(persistenceDir).rollCycle(rollCycles).storeFileListener(new StoreFileListener() {
                @Override
                public void onReleased(int i, File file) {
                    String currentTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy‌​-MM-dd HH:mm:ss.SSS"));
                    System.out.println(currentTime + ": " + Thread.currentThread().getName() + " onReleased called for file: " + file.getAbsolutePath() + " for cycle: " + i);
                }
                @Override
                public void onAcquired(int cycle, File file) {
                    String currentTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy‌​-MM-dd HH:mm:ss.SSS"));
                    System.out.println(currentTime + ": " + Thread.currentThread().getName() + " onAcquired called for file: " + file.getAbsolutePath() + " for cycle: " + cycle);
                }
            }).build();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return chronicle;
    }
}

import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import org.apache.commons.lang3.RandomStringUtils;

import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;

public class MarketDataWriter {
    private static AtomicLong dataSeq = new AtomicLong();
    private static long longSequence = 0;
    private static int intSequence = 0;

    public static void main(String args[]) {
        String path = "C:\\Logs\\ChronicleData\\marketdata";
        writeMarketData(path);
    }

    private static void writeMarketData(String path) {
        ChronicleFactory chronicleFactory = new ChronicleFactory();
        SingleChronicleQueue chronicle = chronicleFactory.createChronicle("MD", path, RollCycles.MINUTELY);

        ExcerptAppender appender = chronicle.acquireAppender();

        while (true) {
            Jvm.pause(100); //NOTE: Slowing down writer to understand file rolling
            appender.writeBytes(b -> {
                b.writeLong(getLongSequence());
                b.writeInt(getIntSequence());
            });
        }
    }

    private static long getLongSequence() {
        return longSequence++;
    }

    private static int getIntSequence() {
        return intSequence++;
    }
}

import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SimpleMarketDataReader {

    private static final ExecutorService executor = Executors.newCachedThreadPool();

    public static void main(String args[]) {
        String pathForMarketData = "C:\\Logs\\ChronicleData\\marketdata";
        readMarketData(pathForMarketData);
    }

    public static void readMarketData(String pathForMarketDataFile) {
        ChronicleFactory chronicleFactory = new ChronicleFactory();
        SingleChronicleQueue chronicle = chronicleFactory.createChronicle("Reader", pathForMarketDataFile, RollCycles.MINUTELY);

        //Create another thread to read same file
        SimpleMarketDataReaderNewChronicle simpleMarketDataReaderNewChronicle = new SimpleMarketDataReaderNewChronicle();
        executor.submit(simpleMarketDataReaderNewChronicle);

        ExcerptTailer tailer = chronicle.createTailer();
        try {
            while (true) {
                tailer.readBytes(b -> {
                    b.readLong();
                    b.readInt();
                    //System.out.println("Long Sequence in SimpleMarketDataReader: " + b.readLong());
                    //System.out.println("User data is: " + userData);
                    //System.out.println("Int Sequence is: " + b.readInt());
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Вывод записи:

2018‌​-01-03 09:36:00.079: main onAcquired вызван для файла: C:\Logs\ChronicleData\marketdata\20180103-0936.cq4для цикла: 25249536

2018‌​-01-03 09:37:00.098: main onReleased вызван для файла: C:\Logs\ChronicleData\marketdata\20180103-0936.cq4для цикла: 25249536

Вывод для чтения:

2018‌​-01-03 09:36:00.065: main onAcquired вызван для файла: C:\Logs\ChronicleData\marketdata\20180103-0936.cq4для цикла: 25249536

2018‌​-01-03 09:36:00.075: main onReleased вызван для файла: C:\Logs\ChronicleData\marketdata\20180103-0936.cq4для цикла: 25249536

2018‌​-01-03 09:36:00.078: main onAcquired вызван для файла: C:\Logs\ChronicleData\marketdata\20180103-0936.cq4для цикла: 25249536

2018‌​-01-03 09:36:00.082: main onReleased вызван для файла: C:\Logs\ChronicleData\marketdata\20180103-0936.cq4для цикла: 25249536

2018‌​-01-03 09:36:00.086: main onAcquired вызван для файла: C:\Logs\ChronicleData\marketdata\20180103-0936.cq4для цикла: 25249536

2018‌​-01-03 09:37:00.103: main onReleased вызван для файла: C:\Logs\ChronicleData\marketdata\20180103-0936.cq4для цикла: 25249536


person AmbGup    schedule 03.01.2018    source источник
comment
Это необходимо исследовать дополнительно. Можете ли вы попробовать версию 4.6.60, так как многие ошибки были исправлены?   -  person Peter Lawrey    schedule 04.01.2018
comment
Я тестировал 4.6.59 и не видел вышеуказанной проблемы в последней версии. Я пробовал 4.6.60 в репозитории maven, и он все еще недоступен mvnrepository.com/artifact /net.openhft/chronicle-queue. Может ли кто-нибудь добавить мне список рассылки для последних выпусков?   -  person AmbGup    schedule 05.01.2018
comment
mvnrepository немного медленный. Использование поиска maven будет более актуальным search.maven.org/# search%7Cga%7C1%7Cchronicle%20queue Последняя версия 4.6.61, поэтому я бы попробовал ее.   -  person Peter Lawrey    schedule 05.01.2018
comment
Я начал использовать 4.6.61 и не вижу поведения 4.5.27. Планируете ли вы дальнейшие выпуски? Я еще недостаточно уверен, чтобы продвигать 4.6.x в производство.   -  person AmbGup    schedule 05.01.2018
comment
не сегодня. Мы запланировали улучшения, но я не могу придумать, что касается вашей проблемы.   -  person Peter Lawrey    schedule 05.01.2018