Я решаю проблему арбитража двух каналов по протоколу FAST. Пожалуйста, не беспокойтесь, если вы не знакомы с этим, мой вопрос на самом деле довольно общий. Но добавляю описание проблемы для интересующихся (можете пропустить).
Данные во всех каналах UDP распространяются в двух идентичных каналах (A и B) на двух разных многоадресных IP-адресах. Настоятельно рекомендуется, чтобы клиент получал и обрабатывал оба канала из-за возможной потери пакетов UDP. Обработка двух одинаковых фидов позволяет статистически снизить вероятность потери пакетов. Не уточняется, в какой именно ленте (А или Б) сообщение появляется впервые. Для арбитража этих каналов следует использовать порядковый номер сообщения, указанный в преамбуле или в теге 34-MsgSeqNum. Использование преамбулы позволяет определить порядковый номер сообщения без декодирования сообщения FAST. Обработка сообщений из фидов А и Б должна производиться по следующему алгоритму:
- Слушайте каналы A и B
- Обрабатывайте сообщения в соответствии с их порядковыми номерами.
- Игнорировать сообщение, если сообщение с таким же порядковым номером уже было обработано ранее.
Если появляется пробел в порядковом номере, это указывает на потерю пакетов в обоих каналах (A и B). Клиент должен инициировать один из процессов восстановления. Но в первую очередь клиент должен подождать разумное время, возможно, потерянный пакет придет чуть позже из-за переупорядочения пакетов. Протокол UDP не может гарантировать доставку пакетов в последовательности.
// далее алгоритм восстановления tcp
Я написал такой очень простой класс. Он предварительно выделяет все необходимые классы, а затем первый поток, который получает конкретный seqNum
, может его обработать. Другой поток отбросит его позже:
class MsgQueue
{
public:
MsgQueue();
~MsgQueue(void);
bool Lock(uint32_t msgSeqNum);
Msg& Get(uint32_t msgSeqNum);
void Commit(uint32_t msgSeqNum);
private:
void Process();
static const int QUEUE_LENGTH = 1000000;
// 0 - available for use; 1 - processing; 2 - ready
std::atomic<uint16_t> status[QUEUE_LENGTH];
Msg updates[QUEUE_LENGTH];
};
Выполнение:
MsgQueue::MsgQueue()
{
memset(status, 0, sizeof(status));
}
MsgQueue::~MsgQueue(void)
{
}
// For the same msgSeqNum should return true to only one thread
bool MsgQueue::Lock(uint32_t msgSeqNum)
{
uint16_t expected = 0;
return status[msgSeqNum].compare_exchange_strong(expected, 1);
}
void MsgQueue::Commit(uint32_t msgSeqNum)
{
status[msgSeqNum] = 2;
Process();
}
// this method probably should be combined with "Lock" but please ignore! :)
Msg& MsgQueue::Get(uint32_t msgSeqNum)
{
return updates[msgSeqNum];
}
void MsgQueue::Process()
{
// ready packets must be processed,
}
Использование:
if (!msgQueue.Lock(seq)) {
return;
}
Msg msg = msgQueue.Get(seq);
msg.Ticker = "HP"
msg.Bid = 100;
msg.Offer = 101;
msgQueue.Commit(seq);
Это прекрасно работает, если предположить, что QUEUE_LENGTH равно бесконечности. Потому что в этом случае один msgSeqNum = один updates
элемент массива.
Но я должен сделать буфер циклическим, потому что невозможно хранить всю историю (многие миллионы пакетов), и нет причин для этого. На самом деле мне нужно буферизовать достаточно пакетов для восстановления сеанса, и после восстановления сеанса я могу их удалить.
Но наличие кольцевого буфера существенно усложняет алгоритм. Например, предположим, что у нас есть циклический буфер длиной 1000. И в то же время мы пытаемся обработать seqNum = 10 000 и seqNum = 11 000 (это ОЧЕНЬ маловероятно, но все же возможно). Оба этих пакета будут отображены в массив updates
с индексом 0
, поэтому произойдет коллизия. В этом случае буфер должен «отбрасывать» старые пакеты и обрабатывать новые.
Реализовать то, что я хочу, используя locks
, тривиально, но написать код lock-free
в циклическом буфере, который используется из разных потоков, действительно сложно. Поэтому я приветствую любые предложения и советы, как это сделать. Спасибо!