предотвращение столкновений при сворачивании бесконечного буфера без блокировки в циклический буфер

Я решаю проблему арбитража двух каналов по протоколу FAST. Пожалуйста, не беспокойтесь, если вы не знакомы с этим, мой вопрос на самом деле довольно общий. Но добавляю описание проблемы для интересующихся (можете пропустить).


Данные во всех каналах UDP распространяются в двух идентичных каналах (A и B) на двух разных многоадресных IP-адресах. Настоятельно рекомендуется, чтобы клиент получал и обрабатывал оба канала из-за возможной потери пакетов UDP. Обработка двух одинаковых фидов позволяет статистически снизить вероятность потери пакетов. Не уточняется, в какой именно ленте (А или Б) сообщение появляется впервые. Для арбитража этих каналов следует использовать порядковый номер сообщения, указанный в преамбуле или в теге 34-MsgSeqNum. Использование преамбулы позволяет определить порядковый номер сообщения без декодирования сообщения FAST. Обработка сообщений из фидов А и Б должна производиться по следующему алгоритму:

  1. Слушайте каналы A и B
  2. Обрабатывайте сообщения в соответствии с их порядковыми номерами.
  3. Игнорировать сообщение, если сообщение с таким же порядковым номером уже было обработано ранее.
  4. Если появляется пробел в порядковом номере, это указывает на потерю пакетов в обоих каналах (A и B). Клиент должен инициировать один из процессов восстановления. Но в первую очередь клиент должен подождать разумное время, возможно, потерянный пакет придет чуть позже из-за переупорядочения пакетов. Протокол UDP не может гарантировать доставку пакетов в последовательности.

    // далее алгоритм восстановления tcp


Я написал такой очень простой класс. Он предварительно выделяет все необходимые классы, а затем первый поток, который получает конкретный seqNum, может его обработать. Другой поток отбросит его позже:

class MsgQueue
{
public:
    MsgQueue();
    ~MsgQueue(void);
    bool Lock(uint32_t msgSeqNum);
    Msg& Get(uint32_t msgSeqNum);
    void Commit(uint32_t msgSeqNum);
private:
    void Process();
    static const int QUEUE_LENGTH = 1000000;

    // 0 - available for use; 1 - processing; 2 - ready
    std::atomic<uint16_t> status[QUEUE_LENGTH];
    Msg updates[QUEUE_LENGTH];
};

Выполнение:

MsgQueue::MsgQueue()
{
        memset(status, 0, sizeof(status));
}

MsgQueue::~MsgQueue(void)
{
}

// For the same msgSeqNum should return true to only one thread 
bool MsgQueue::Lock(uint32_t msgSeqNum)
{
    uint16_t expected = 0;
    return status[msgSeqNum].compare_exchange_strong(expected, 1);
}

void MsgQueue::Commit(uint32_t msgSeqNum)
{
    status[msgSeqNum] = 2;
            Process();
}

    // this method probably should be combined with "Lock" but please ignore! :)
Msg& MsgQueue::Get(uint32_t msgSeqNum)
{
    return updates[msgSeqNum];
}

void MsgQueue::Process()
{
        // ready packets must be processed, 
}

Использование:

if (!msgQueue.Lock(seq)) {
    return;
}
Msg msg = msgQueue.Get(seq);
msg.Ticker = "HP"
msg.Bid = 100;
msg.Offer = 101;
msgQueue.Commit(seq);

Это прекрасно работает, если предположить, что QUEUE_LENGTH равно бесконечности. Потому что в этом случае один msgSeqNum = один updates элемент массива.

Но я должен сделать буфер циклическим, потому что невозможно хранить всю историю (многие миллионы пакетов), и нет причин для этого. На самом деле мне нужно буферизовать достаточно пакетов для восстановления сеанса, и после восстановления сеанса я могу их удалить.

Но наличие кольцевого буфера существенно усложняет алгоритм. Например, предположим, что у нас есть циклический буфер длиной 1000. И в то же время мы пытаемся обработать seqNum = 10 000 и seqNum = 11 000 (это ОЧЕНЬ маловероятно, но все же возможно). Оба этих пакета будут отображены в массив updates с индексом 0, поэтому произойдет коллизия. В этом случае буфер должен «отбрасывать» старые пакеты и обрабатывать новые.

Реализовать то, что я хочу, используя locks, тривиально, но написать код lock-free в циклическом буфере, который используется из разных потоков, действительно сложно. Поэтому я приветствую любые предложения и советы, как это сделать. Спасибо!


person Oleg Vazhnev    schedule 24.04.2013    source источник
comment
если длина вашего буфера равна 1000, seqNum 10000 или 11000 не должны использоваться.   -  person Mare Infinitus    schedule 25.04.2013
comment
@MareInfinitus, поэтому мне нужно использовать круговой буфер   -  person Oleg Vazhnev    schedule 25.04.2013


Ответы (1)


Я не думаю, что вы можете использовать кольцевой буфер. В массиве status[] можно использовать хешированный индекс. То есть, hash = seq % 1000. Проблема в том, что порядковый номер диктуется сетью, и вы не можете контролировать его порядок. Вы хотите блокировать на основе этого порядкового номера. Ваш массив не обязательно должен быть бесконечным, достаточно диапазона порядкового номера; но это, вероятно, больше, чем практично.

Я не уверен, что происходит, когда порядковый номер заблокирован. Означает ли это, что его обрабатывает другой поток? Если это так, вы должны поддерживать подсписок для конфликтов хэшей, чтобы разрешать конкретный порядковый номер.

Вы также можете рассматривать размер массива как степень числа 2. Например, 1024 позволит hash = seq & 1023;, что должно быть весьма эффективным.

person Community    schedule 28.04.2013
comment
спасибо за ответ! ведь я создал ДВА круговых буфера. порядковый номер создается биржей и увеличивается один за другим. из-за проблем с сетью, однако некоторые из них могут быть пропущены, и у меня есть механизм для обнаружения и решения таких проблем. поэтому решение было - создать ДВА циклических буфера вместо ОДНОГО, а затем добавить еще один ВРАЩАЮЩИЙСЯ поток, который объединяет эти циклические буферы и процесс. - person Oleg Vazhnev; 28.04.2013
comment
Хм, я был бы обеспокоен тем, что иногда пакеты могут быть переупорядочены. - person artless noise; 28.04.2013
comment
я навожу порядок. вот почему у меня есть круговой буфер. (в реальной жизни, кстати, кажется, что они НИКОГДА не меняют порядок в моей конфигурации, я тестировал несколько дней :) - person Oleg Vazhnev; 28.04.2013
comment
Повторный заказ происходит довольно редко. Это зависит от того, где вы сидите в сети. Я недостаточно думал о вашем решении. Главное, что исходная задача нужна для преобразования seqnum. Я думаю, у вас есть очередь «задания» (входящие seq) и очередь «работы» (обработка пакетов), и вы больше ничего не вводите в seq. Я изменил свой ответ на «режим сообщества». Вы можете исправить или оставить свой, а этот я удалю. - person artless noise; 29.04.2013