Как бы вы реализовали рабочую очередь в etcd

Я только начал изучать etcd, и один из вариантов использования, который упоминается в выступлениях создателей, — это система очереди работ.

Но как бы вы это реализовали?

Базовый шаблон будет примерно таким.

1 процесс, генерирующий «тикеты с описанием работы», и поместите этот тикет в папку etcd, скажем, «/queue/worktickets/00000000001/»

1->многие процессы прослушивают папку «/queue/worktickets/» на наличие изменений. когда появится новый рабочий билет, каждый процесс попытается захватить билет, создав новое значение в "/queue/locks/00000001", чтобы заблокировать этот билет. Только первый сможет создать значение блокировки.

Процесс, создавший билет блокировки, выполняет свою работу, а затем удаляет билет из очереди и, возможно, значение блокировки. Затем попробуйте взять следующий доступный билет из очереди. Если билетов больше нет, снова начните прослушивать изменения в папке «/queue/worktickets/».

На мой взгляд, это должно быть довольно просто реализовать, но если очередь становится большой (билет легко сгенерировать, но сложно обработать), то кажется, что будет много данных, передаваемых из etcd каждому из клиентов. Насколько я знаю, невозможно сказать, дайте мне первое значение в этой папке, которое не существует в этой папке, и нет никаких n лучших элементов из папки.

Любой хочет поделиться своими мыслями по этому поводу.


person PEtter    schedule 06.01.2016    source источник
comment
Репозиторий etcd содержит рецепты, включающие очередь, который может быть вам полезен.   -  person Tim Bunce    schedule 30.08.2018


Ответы (2)


Поэтому я придумал решение, которое считаю надежным. Вот цели, для которых я разработал свое решение:

  • Рабочим должно быть эффективно (т. е. O(1) круговых обращений по сети) получение элемента из рабочей очереди.
  • Если рабочий умирает или иным образом терпит неудачу при обработке предмета, предмет становится доступным для других рабочих.

Итак, идея состоит в том, чтобы иметь две очереди: ожидающую очередь и текущую очередь. Изначально все элементы находятся в очереди ожидания.

Когда рабочий процесс пытается получить элемент, он передает его из очереди ожидания в очередь выполнения с помощью транзакции (доступной в etcd 3). В той же транзакции рабочий также создает блокировку для элемента. Замок охраняется арендой, поэтому он автоматически удаляется, если работник умирает.

Если рабочий процесс успешно завершает обработку элемента, он удаляет элемент из текущей очереди, и все готово. В случае сбоя рабочего процесса блокировка истечет, а элемент останется в очереди выполнения.

Таким образом, ожидается, что рабочие также будут просматривать текущую очередь после того, как ожидающая очередь будет исчерпана. Ожидается, что текущая очередь будет меньше по сравнению с ожидающей очередью, поэтому поиск элемента, который в настоящее время не заблокирован (простым перечислением текущей очереди), не будет дорогим.

Или, как упомянул @dannysauer в комментариях, у вас также может быть другой отказоустойчивый процесс, который переносит элементы из текущей очереди обратно в ожидающую очередь.

person Derek Chiang    schedule 02.01.2017
comment
Спасибо Дереку и @dannysauer за отличные ответы. Мне нужно реализовать что-то подобное с потенциально большими (100 КБ) значениями. Вместо того, чтобы копировать в ожидающую папку, что кажется дорогостоящим, было бы более эффективно просто проверить совпадающие блокировки и взять самый старый рабочий элемент, который не имеет соответствующей блокировки, а затем попытаться создать блокировку? Я предполагаю, что чтение ключей в etcd намного дешевле, чем попытка их создания. Похоже, что это также будет ловить любые элементы, срок действия блокировки которых истекает. - person Sap; 07.05.2017
comment
Прошло некоторое время, поэтому я могу ошибаться, но я не думаю, что это возможно check matching locks and take the oldest "work item" that doesn't have a corresponding lock эффективным образом. - person Derek Chiang; 17.05.2017

Я полагаю, что вы уже решили эту проблему, но я бы сделал это, перечислив содержимое каталога рабочей очереди (что вы получаете, когда вы в любом случае извлекаете каталог). Затем просто начните спускаться по списку, пытаясь создать блокировки с одинаковыми именами в каталоге блокировки, пока не получите блокировку. Создание «файла» блокировки является атомарным, если вы используете флаг «prevExist = false», поэтому, если вы успешно создали его, вы тот, у кого есть блокировка для этого элемента.

В идеале у вас должна быть грубая оценка того, сколько времени потребуется для обработки элемента, и вы устанавливаете TTL немного больше, чем это (или вы периодически обновляете TTL после шагов, время которых вы можете оценить). Либо вы удаляете элемент из исходной очереди (и, возможно, воссоздаете его в «завершенном» каталоге), когда закончите, либо срок действия вашей блокировки истекает, и кто-то другой забирает его.

Кроме того, в идеале вы должны поместить свой уникальный «идентификатор узла» (имя хоста, что угодно) в файл блокировки, чтобы ваши обновления TTL выполняли сравнение и установку, что не удастся, если вы потеряете блокировку из-за слишком долгого времени.

Предположительно, в рабочем каталоге будут элементы, созданные последовательно с использованием POST в каталоге, в то время как в очереди блокировки и завершенном каталоге будут элементы, созданные по имени с использованием PUT.

person dannysauer    schedule 23.03.2016
comment
Но что, если у вас много элементов в рабочей очереди? Вы, конечно, не хотите пытаться заблокировать их один за другим. Есть ли способ эффективно получить элемент, который не заблокирован? - person Derek Chiang; 02.01.2017
comment
Если у вас много клиентов, вы должны использовать пару очередей. Задания поступают в очередь ожидания, затем клиенты перемещают ожидающее задание в очередь выполнения, когда клиент обрабатывает задание, а затем либо перемещаются в завершенную очередь по завершении, либо удаляются. Вам нужно что-то, чтобы следить за блокировками и перемещать вещи из незавершенных обратно в ожидающие, если они не будут завершены до истечения срока блокировки. - person dannysauer; 02.01.2017
comment
Альтернативный подход состоит в том, чтобы рабочая очередь использовала последовательно нумерованные записи (что etcd сделает за вас), а затем использовала счетчик, для которого каждый клиент выполняет атомарное приращение. Вам не нужно размещать каталог таким образом, поскольку вы просто выбираете один элемент, соответствующий счетчику. Но тогда у вас есть один спорный момент, от которого зависят все клиенты, что может быть проблемой в зависимости от определения работы. - person dannysauer; 02.01.2017
comment
Я опубликовал решение, похожее на упомянутый вами подход с несколькими очередями. Что касается подхода с последовательными именами: я думаю, что он отлично работает для распределения элементов, но неясно, как вы вернете элемент обратно в очередь, если рабочий выйдет из строя, поскольку счетчик продвинулся бы мимо элемента. - person Derek Chiang; 02.01.2017
comment
Ага; со счетчиком вам все равно понадобится операция очистки, которая проверяет незавершенные задания, если незавершенные задания вызывают беспокойство. Мне все равно не нравится один общий замок для всего, большую часть времени. - person dannysauer; 02.01.2017