Цель
Я пытаюсь создать отношения зависимости между обработчиками, которые несколько цикличны, и я не могу понять, как это сделать правильно. Чего я хочу добиться, так это варианта producer -> [handlers 1-3] -> handler 4
.
Итак, disruptor.handleEventsWith(h1, h2, h3).then(h4);
. Но у меня есть дополнительные требования, которые
- Хотя обработчики 1-3 обрабатывают сообщения параллельно, ни один из них не начинает обработку следующего сообщения, пока все они не закончат обработку предыдущего сообщения.
- После первого сообщения обработчики 1-3 ждут, пока обработчик 4 закончит обработку самого последнего сообщения, прежде чем обрабатывать следующее сообщение.
Эквивалентная логика выполнения с использованием одного обработчика событий может быть следующей:
disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
Arrays.asList(h1, h2, h3).parallelStream()
.forEach(h -> h.onEvent(event, sequence, endOfBatch));
h4.onEvent(event, sequence, endOfBatch);
});
Контекст
Контекст проекта заключается в том, что каждый из обработчиков 1-3 обновляет свое состояние в соответствии с сообщением и что после обработки сообщения каждым из трех обработчиков они находятся в согласованном состоянии. Затем обработчик 4 запускает некоторую логику на основе состояния, обновленного обработчиками 1-3. Таким образом, обработчик 4 должен видеть согласованные состояния только для структур данных, поддерживаемых 1-3, а это означает, что обработчики 1-3 не должны обрабатывать следующее сообщение, пока обработчик 4 не завершит работу.
(Хотя цель определенно состоит в том, чтобы использовать Disruptor для управления параллелизмом, а не java.util.Stream
.)
Не уверен, что это имеет значение, но это также тот случай, когда логика обработчика 4 может быть разбита на две части: одна требует, чтобы ни один из обработчиков 1-3 не обновлялся, а другая требует только завершения первой части обработчика 4. Таким образом, обработчики 1-3 могут обрабатывать сообщение, в то время как вторая часть обработчика 4 все еще выполняется.
Есть ли способ сделать это? Или может у меня схема неверная? Я чувствую, что должен быть способ сделать это через SequenceBarrier
, но я не совсем понимаю, как реализовать этот настраиваемый барьер. Для обработчиков 1-3 я думаю, что хотел бы сделать барьер с логикой handlers[1:3].lastProcessedSequence() == handlers[4].lastProcessedSequence()
, но я не уверен, куда поместить эту логику.
Спасибо!