График зависимости LMAX Disruptor/Gating с SequenceBarrier

Цель

Я пытаюсь создать отношения зависимости между обработчиками, которые несколько цикличны, и я не могу понять, как это сделать правильно. Чего я хочу добиться, так это варианта producer -> [handlers 1-3] -> handler 4.

Итак, disruptor.handleEventsWith(h1, h2, h3).then(h4);. Но у меня есть дополнительные требования, которые

  1. Хотя обработчики 1-3 обрабатывают сообщения параллельно, ни один из них не начинает обработку следующего сообщения, пока все они не закончат обработку предыдущего сообщения.
  2. После первого сообщения обработчики 1-3 ждут, пока обработчик 4 закончит обработку самого последнего сообщения, прежде чем обрабатывать следующее сообщение.

Эквивалентная логика выполнения с использованием одного обработчика событий может быть следующей:

disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
  Arrays.asList(h1, h2, h3).parallelStream()
        .forEach(h -> h.onEvent(event, sequence, endOfBatch));
  h4.onEvent(event, sequence, endOfBatch);
});

Контекст

Контекст проекта заключается в том, что каждый из обработчиков 1-3 обновляет свое состояние в соответствии с сообщением и что после обработки сообщения каждым из трех обработчиков они находятся в согласованном состоянии. Затем обработчик 4 запускает некоторую логику на основе состояния, обновленного обработчиками 1-3. Таким образом, обработчик 4 должен видеть согласованные состояния только для структур данных, поддерживаемых 1-3, а это означает, что обработчики 1-3 не должны обрабатывать следующее сообщение, пока обработчик 4 не завершит работу.

(Хотя цель определенно состоит в том, чтобы использовать Disruptor для управления параллелизмом, а не java.util.Stream.)

Не уверен, что это имеет значение, но это также тот случай, когда логика обработчика 4 может быть разбита на две части: одна требует, чтобы ни один из обработчиков 1-3 не обновлялся, а другая требует только завершения первой части обработчика 4. Таким образом, обработчики 1-3 могут обрабатывать сообщение, в то время как вторая часть обработчика 4 все еще выполняется.

Есть ли способ сделать это? Или может у меня схема неверная? Я чувствую, что должен быть способ сделать это через SequenceBarrier, но я не совсем понимаю, как реализовать этот настраиваемый барьер. Для обработчиков 1-3 я думаю, что хотел бы сделать барьер с логикой handlers[1:3].lastProcessedSequence() == handlers[4].lastProcessedSequence(), но я не уверен, куда поместить эту логику.

Спасибо!


person John Dorian    schedule 02.05.2017    source источник


Ответы (1)


Я бы подумал о том, чтобы обработчики не имели состояния и использовали обработанные ими сообщения, чтобы содержать состояние вашей системы. Таким образом, вам вообще не нужно будет синхронизировать ваши обработчики.

person 780Farva    schedule 25.08.2017