Грег Янг в своем документе по CQRS в разделе Создание хранилища событий при записи событий в хранилище событий, которое он проверил на оптимистичный параллелизм. Я действительно не понимаю, почему он сделал эту проверку, может ли кто-нибудь объяснить мне на конкретном примере.
Хранилище событий и оптимистичный параллелизм
Ответы (2)
Я действительно не понимаю, почему он сделал эту проверку, может ли кто-нибудь объяснить мне на конкретном примере.
Хранилища событий должны быть постоянными в том смысле, что после записи события оно будет видно при каждом последующем чтении. Таким образом, каждое действие в базе данных должно быть добавлением. Полезной мысленной моделью является представление односвязного списка.
Если база данных будет поддерживать более одного потока выполнения с доступом для записи, вы столкнетесь с проблемой «потерянных обновлений». В виде связанных списков это может выглядеть так:
Thread(1) [... <- 69726c3e <- /x.tail] = get(/x)
Thread(2) [... <- 69726c3e <- /x.tail] = get(/x)
Thread(1) set(/x, [ ... <- 69726c3e <- 709726c3 <- /x.tail])
Thread(2) set(/x, [ ... <- 69726c3e <- 83b97195 <- /x.tail])
История, записанная потоком (2), не включает событие: 709726c3, записанное потоком (1). Таким образом, «потерянное обновление».
В универсальной базе данных вы обычно управляете этим с помощью транзакций: какая-то скрытая магия отслеживает все ваши зависимости данных, и если предварительные условия не выполняются, когда вы пытаетесь зафиксировать транзакцию, вся ваша работа отклоняется.
Но в хранилищах событий нет необходимости использовать все степени свободы, которые поддерживают общий случай — редактирование событий, хранящихся в базе данных, запрещено, равно как и изменение зависимостей между событиями.
Единственная изменяемая часть изменения - единственное место, где мы заменяем старое значение новым значением - это когда мы меняем /x.tail
Thread(1) [... <- 69726c3e <- /x.tail] = get(/x)
Thread(2) [... <- 69726c3e <- /x.tail] = get(/x)
Thread(1) set(/x, [ ... <- 69726c3e <- 709726c3 <- /x.tail])
Thread(2) set(/x, [ ... <- 69726c3e <- 83b97195 <- /x.tail])
Проблема здесь просто в том, что Thread(2) подумал, что 6 <- /x.tail
было истинным, и заменил его значением, которое потеряло событие 7. Если мы изменим нашу запись с set
на compare-and-set
...
Thread(1) [... <- 69726c3e <- /x.tail] = get(/x)
Thread(2) [... <- 69726c3e <- /x.tail] = get(/x)
Thread(1) compare-and-set(/x, 69726c3e <- /x.tail, [ ... <- 69726c3e <- 709726c3 <- /x.tail])
Thread(2) compare-and-set(/x, 69726c3e <- /x.tail, [ ... <- 69726c3e <- 83b97195 <- /x.tail]) // FAILS
тогда хранилище данных может обнаружить конфликт и отклонить недопустимую запись.
Конечно, если хранилище данных увидит действия потоков в другом порядке, то сбойная команда может измениться.
Thread(1) [... <- 69726c3e <- /x.tail] = get(/x)
Thread(2) [... <- 69726c3e <- /x.tail] = get(/x)
Thread(2) compare-and-set(/x, 69726c3e <- /x.tail, [ ... <- 69726c3e <- 83b97195 <- /x.tail])
Thread(1) compare-and-set(/x, 69726c3e <- /x.tail, [ ... <- 69726c3e <- 709726c3 <- /x.tail]) // FAILS
Проще говоря, где set
дает нам семантику «выигрывает последний писатель», compare-and-set
дает нам «выигрывает первый писатель», что устраняет проблему потерянного обновления.
TLDR; Эта проверка параллелизма необходима, потому что то, какие события генерируются, зависит от предыдущих событий. Таким образом, если есть другие события, которые одновременно генерируются другим процессом, решение необходимо принять заново.
Способ использования хранилища событий выглядит следующим образом:
- Старые события загружаются из потока событий (= раздел в хранилище событий, содержащий все события, сгенерированные экземпляром Aggregate).
- Старые события обрабатываются/применяются Агрегатом, которому они принадлежат, в том порядке, в котором они были сгенерированы.
- Aggregate, основываясь на внутреннем состоянии, которое было создано из этих событий, решает создать несколько новых событий.
- Эти новые события добавляются в Eventstream.
Итак, шаг 3 зависит от предыдущих событий, которые были сгенерированы до выполнения этой команды.
Если некоторые события, сгенерированные параллельно другим процессом, добавляются к одному и тому же потоку событий, это означает, что принятое решение было основано на ложной предпосылке и, следовательно, должно быть принято повторно, повторив шаг 1.
Events are emitted as a reaction to commands
- конечно, я уже писал: So, step 3 depends on the previous events that were generated before this command is executed
- person Constantin Galbenu; 13.09.2018
if we have one instance by aggregate ID, I don't see the need to check for concurrency problem
— у вас может легко возникнуть ситуация, когда один и тот же экземпляр Aggregate обрабатывает больше команд одновременно
- person Constantin Galbenu; 13.09.2018