Очередь хроник V3. Могут ли записи быть потеряны при смене блока данных?

У меня есть приложение, которое записывает записи в очередь хроник (V3), которая также сохраняет значения индекса записи выдержки в других картах (хроники) путем предоставления индексированного доступа в очереди. Иногда нам не удается найти определенную запись, которую мы ранее сохранили, и я полагаю, что это может быть связано с пролонгацией блока данных.

Ниже приведена автономная тестовая программа, которая воспроизводит такие варианты использования в небольших масштабах. Он неоднократно записывает запись и немедленно пытается найти результирующее значение индекса, используя отдельный ExcerptTailer. Какое-то время все хорошо, пока первый блок данных не будет израсходован и не будет назначен второй файл данных, после чего начинаются сбои при извлечении. Если размер блока данных увеличивается во избежание пролонгации, то никакие записи не теряются. Также не вызывает проблем использование небольшого размера блока данных индекса, вызывающего создание нескольких индексных файлов.

Тестовая программа также пытается использовать ExcerptListener, работающий параллельно, чтобы увидеть, были ли записи, очевидно "потерянные" автором, когда-либо получены потоком чтения - это не так. Также пытается перечитать полученную очередь от начала до конца, что еще раз подтверждает, что они действительно потеряны.

Просматривая код, я вижу, что при поиске «отсутствующей записи» в AbstractVanillarExcerpt#index он успешно находит правильный объект VanillaMappedBytes из dataCache, но определяет, что записи нет. и смещение данных как len == 0. В дополнение к тому, что записи не были найдены, в какой-то момент после того, как проблемы начинают возникать после переноса, из метода VanillaMappedFile#fileChannel выдается NPE из-за того, что ему был передан нулевой файл. дорожка. Путь кода предполагает, что при успешном поиске записи в индексе всегда будет найден файл, но в данном случае это не так.

Можно ли надежно использовать Chronicle Queue при смене блоков данных, и если да, то что я делаю, что может вызвать проблему, с которой я сталкиваюсь?

import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;

import org.junit.Before;
import org.junit.Test;

import net.openhft.affinity.AffinitySupport;
import net.openhft.chronicle.Chronicle;
import net.openhft.chronicle.ChronicleQueueBuilder;
import net.openhft.chronicle.ExcerptAppender;
import net.openhft.chronicle.ExcerptCommon;
import net.openhft.chronicle.ExcerptTailer;
import net.openhft.chronicle.VanillaChronicle;

public class ChronicleTests {
    private static final int CQ_LEN = VanillaChronicle.Cycle.DAYS.length();
    private static final long CQ_ENT = VanillaChronicle.Cycle.DAYS.entries();
    private static final String ROOT_DIR = System.getProperty(ChronicleTests.class.getName() + ".ROOT_DIR",
            "C:/Temp/chronicle/");
    private static final String QDIR = System.getProperty(ChronicleTests.class.getName() + ".QDIR", "chronicleTests");
    private static final int DATA_SIZE = Integer
            .parseInt(System.getProperty(ChronicleTests.class.getName() + ".DATA_SIZE", "100000"));
    // Chunk file size of CQ index
    private static final int INDX_SIZE = Integer
            .parseInt(System.getProperty(ChronicleTests.class.getName() + ".INDX_SIZE", "10000"));
    private static final int Q_ENTRIES = Integer
            .parseInt(System.getProperty(ChronicleTests.class.getName() + ".Q_ENTRIES", "5000"));
    // Data type id
    protected static final byte FSYNC_DATA = 1;
    protected static final byte NORMAL_DATA = 0;
    protected static final byte TH_START_DATA = -1;
    protected static final byte TH_END_DATA = -2;
    protected static final byte CQ_START_DATA = -3;
    private static final long MAX_RUNTIME_MILLISECONDS = 30000;

    private static String PAYLOAD_STRING = "1234567890ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
    private static byte PAYLOAD_BYTES[] = PAYLOAD_STRING.getBytes();

    private Chronicle _chronicle;
    private String _cqPath = ROOT_DIR + QDIR;

    @Before
    public void init() {
        buildCQ();
    }

    @Test
    public void test() throws IOException, InterruptedException {
        boolean passed = true;
        Collection<Long> missingEntries = new LinkedList<Long>();
        long sent = 0;
        Thread listener = listen();
        try {
            listener.start();
            // Write entries to CQ, 
            for (int i = 0; i < Q_ENTRIES; i++) {
                long entry = writeQEntry(PAYLOAD_BYTES, (i % 100) == 0);
                sent++;
                // check each entry can be looked up
                boolean found = checkEntry(i, entry);
                if (!found)
                    missingEntries.add(entry);
                passed &= found;
            }
            // Wait awhile for the listener
            listener.join(MAX_RUNTIME_MILLISECONDS);
            if (listener.isAlive())
                listener.interrupt();
        } finally {
            if (listener.isAlive()) { // => exception raised so wait for listener
                log("Give listener a chance....");
                sleep(MAX_RUNTIME_MILLISECONDS);
                listener.interrupt();
            }
            log("Sent: " + sent + " Received: " + _receivedEntries.size());
            // Look for missing entries in receivedEntries 
            missingEntries.forEach(me -> checkMissingEntry(me));
            log("All passed? " + passed);
            // Try to find missing entries by searching from the start...
            searchFromStartFor(missingEntries);
            _chronicle.close();
            _chronicle = null;
            // Re-initialise CQ and look for missing entries again...
            log("Re-initialise");
            init();
            searchFromStartFor(missingEntries);
        }
    }

    private void buildCQ() {
        try {
            // build chronicle queue
            _chronicle = ChronicleQueueBuilder.vanilla(_cqPath).cycleLength(CQ_LEN).entriesPerCycle(CQ_ENT)
                    .indexBlockSize(INDX_SIZE).dataBlockSize(DATA_SIZE).build();
        } catch (IOException e) {
            throw new InitializationException("Failed to initialize Active Trade Store.", e);
        }
    }

    private long writeQEntry(byte dataArray[], boolean fsync) throws IOException {
        ExcerptAppender appender = _chronicle.createAppender();
        return writeData(appender, dataArray, fsync);
    }

    private boolean checkEntry(int seqNo, long entry) throws IOException {
        ExcerptTailer tailer = _chronicle.createTailer();
        if (!tailer.index(entry)) {
            log("SeqNo: " + seqNo + " for entry + " + entry + " not found");
            return false;
        }
        boolean isMarker = isMarker(tailer);
        boolean isFsyncData = isFsyncData(tailer);
        boolean isNormalData = isNormalData(tailer);
        String type = isMarker ? "MARKER" : isFsyncData ? "FSYNC" : isNormalData ? "NORMALDATA" : "UNKNOWN";
        log("Entry: " + entry + "(" + seqNo + ") is " + type);
        return true;
    }

    private void log(String string) {
        System.out.println(string);
    }

    private void searchFromStartFor(Collection<Long> missingEntries) throws IOException {
        Set<Long> foundEntries = new HashSet<Long>(Q_ENTRIES);
        ExcerptTailer tailer = _chronicle.createTailer();
        tailer.toStart();
        while (tailer.nextIndex())
            foundEntries.add(tailer.index());
        Iterator<Long> iter = missingEntries.iterator();
        long foundCount = 0;
        while (iter.hasNext()) {
            long me = iter.next();
            if (foundEntries.contains(me)) {
                log("Found missing entry: " + me);
                foundCount++;
            }
        }
        log("searchFromStartFor Found: " + foundCount + " of: " + missingEntries.size() + " missing entries");
    }

    private void checkMissingEntry(long missingEntry) {
        if (_receivedEntries.contains(missingEntry))
            log("Received missing entry:" + missingEntry);
    }

    Set<Long> _receivedEntries = new HashSet<Long>(Q_ENTRIES);

    private Thread listen() {
        Thread returnVal = new Thread("Listener") {

            public void run() {
                try {
                    int receivedCount = 0;
                    ExcerptTailer tailer = _chronicle.createTailer();
                    tailer.toStart();
                    while (receivedCount < Q_ENTRIES) {
                        if (tailer.nextIndex()) {
                            _receivedEntries.add(tailer.index());
                        } else {
                            ChronicleTests.this.sleep(1);
                        }
                    }
                    log("listener complete");
                } catch (IOException e) {
                    log("Interupted before receiving all entries");
                }
            }
        };
        return returnVal;
    }

    private void sleep(long interval) {
        try {
            Thread.sleep(interval);
        } catch (InterruptedException e) {
            // No action required
        }
    }

    protected static final int THREAD_ID_LEN = Integer.SIZE / Byte.SIZE;
    protected static final int DATA_TYPE_LEN = Byte.SIZE / Byte.SIZE;
    protected static final int TIMESTAMP_LEN = Long.SIZE / Byte.SIZE;
    protected static final int CRC_LEN = Long.SIZE / Byte.SIZE;

    protected static long writeData(ExcerptAppender appender, byte dataArray[],
            boolean fsync) {
        appender.startExcerpt(DATA_TYPE_LEN + THREAD_ID_LEN + dataArray.length
                + CRC_LEN);
        appender.nextSynchronous(fsync);
        if (fsync) {
            appender.writeByte(FSYNC_DATA);
        } else {
            appender.writeByte(NORMAL_DATA);
        }
        appender.writeInt(AffinitySupport.getThreadId());
        appender.write(dataArray);
        appender.writeLong(CRCCalculator.calcDataAreaCRC(appender));
        appender.finish();
        return appender.lastWrittenIndex();
    }

    protected static boolean isMarker(ExcerptCommon excerpt) {
        if (isCqStartMarker(excerpt) || isStartMarker(excerpt) || isEndMarker(excerpt)) {
            return true;
        }
        return false;
    }

    protected static boolean isCqStartMarker(ExcerptCommon excerpt) {
        return isDataTypeMatched(excerpt, CQ_START_DATA);
    }

    protected static boolean isStartMarker(ExcerptCommon excerpt) {
        return isDataTypeMatched(excerpt, TH_START_DATA);
    }

    protected static boolean isEndMarker(ExcerptCommon excerpt) {
        return isDataTypeMatched(excerpt, TH_END_DATA);
    }

    protected static boolean isData(ExcerptTailer tailer, long index) {
        if (!tailer.index(index)) {
            return false;
        }
        return isData(tailer);
    }

    private static void movePosition(ExcerptCommon excerpt, long position) {
        if (excerpt.position() != position)
            excerpt.position(position);
    }

    private static void moveToFsyncFlagPos(ExcerptCommon excerpt) {
        movePosition(excerpt, 0);
    }

    private static boolean isDataTypeMatched(ExcerptCommon excerpt, byte type) {
        moveToFsyncFlagPos(excerpt);
        byte b = excerpt.readByte();
        if (b == type) {
            return true;
        }
        return false;
    }

    protected static boolean isNormalData(ExcerptCommon excerpt) {
        return isDataTypeMatched(excerpt, NORMAL_DATA);
    }

    protected static boolean isFsyncData(ExcerptCommon excerpt) {
        return isDataTypeMatched(excerpt, FSYNC_DATA);
    }

    /**
     * Check if this entry is Data
     * 
     * @param excerpt
     * @return true if the entry is data
     */
    protected static boolean isData(ExcerptCommon excerpt) {
        if (isNormalData(excerpt) || isFsyncData(excerpt)) {
            return true;
        }
        return false;
    }

}

person Paul Bandler    schedule 30.06.2017    source источник


Ответы (1)


Проблема возникает только при инициализации размера блока данных со значением, которое не является степенью двойки. Встроенные конфигурации на IndexedChronicleQueueBuilder (small(), medium(), large()) позаботятся об инициализации с использованием степеней двойки, что дало ключ к правильному использованию.

Несмотря на приведенный выше ответ о поддержке, который я очень ценю, было бы полезно, если бы знающий пользователь Chronicle мог подтвердить, что целостность Chronicle Queue зависит от использования блока данных, равного степени двойки.

person Paul Bandler    schedule 05.07.2017
comment
Размер блока должен быть степенью двойки, и должен быть код, обеспечивающий это. В v4 размер округляется до следующей степени числа 2. - person Peter Lawrey; 22.08.2017