У меня есть вопрос о том, что делать в случае медленного потребителя в разрушителе lmax, таком как кольцевой буфер, в котором есть несколько производителей и один потребитель, работающий на x86 Linux. С шаблоном кольцевого буфера, подобным lmax, вы постоянно перезаписываете данные, но что, если потребитель работает медленно. Поэтому, как вы справляетесь со случаем, когда, скажем, в кольцевом буфере размером 10 0-9 кольцевых слотов ваш потребитель находится в слоте 5, и теперь ваши писатели готовы начать запись слота 15, который также является слотом 5 в буфере (то есть: слот 5 = 15 % 10 )? Каков типичный способ справиться с этим, чтобы писатели по-прежнему производили данные в том порядке, в котором они поступали, и клиенты получали данные в том же порядке? Это действительно мой вопрос. Ниже приведены некоторые подробности о моем дизайне, и он отлично работает, просто в настоящее время у меня нет хорошего способа справиться с этой проблемой. Есть несколько потоков, выполняющих запись, и один поток, выполняющий чтение. Я не могу представить несколько потоков чтения без изменения существующего дизайна, который в настоящее время выходит за рамки текущего проекта, но по-прежнему заинтересован в ваших мыслях, если они включают это как решение.
Особенности дизайна
У меня есть кольцевой буфер, и в настоящее время в проекте есть несколько потоков производителей и один поток потребителя. Эта часть дизайна уже существует и не может быть изменена. Я пытаюсь удалить существующую систему очередей, используя кольцевой буфер без блокировки. То, что у меня есть, выглядит следующим образом.
Код работает на x86 Linux, есть несколько потоков для записи и один поток для чтения. Читатель и писатель начинают с интервалом в один слот и равны std::atomic<uint64_t>
, поэтому считыватель начинается со слота 0, а писатель — со слота 1, тогда каждый писатель сначала запрашивает слот, сначала выполняя атомарную операцию fetch_add(1, std::memory_order::memory_order_acq_rel)
в последовательности записи, вызывая incrementSequence
, показанную ниже, а затем использует цикл compare_and_swap для обновления последовательности считывателя, чтобы клиенты знали, что этот слот доступен, см. updateSequence
.
inline data_type incrementSequence() {
return m_sequence.fetch_add(1,std::memory_order::memory_order_seq_cst);
}
void updateSequence(data_type aOld, data_type aNew) {
while ( !m_sequence.compare_exchange_weak(aOld, aNew, std::memory_order::memory_order_release, std::memory_order_relaxed)
if (sequence() < aNew) {
continue;
}
break;
}
}
inline data_type sequence() const {
return m_sequence.load(std::memory_order::memory_order_acquire);
}