В части 1 мы говорили о том, как задания отправляются в очередь. В этом разделе мы узнаем, как задания выбираются из очереди и обрабатываются. Мы также рассмотрим, что произойдет, если задание вызовет какое-либо исключение или как реализуются тайм-ауты. Итак, давайте начнем.

Обработка задания

До сих пор мы только отправляли задание в очередь, если их никто не обрабатывал, они просто продолжали накапливаться в хранилище очереди. Здесь в игру вступает обработчик очереди. Рабочие процессы — это длительные процессы, которые запускаются отдельно, обычно контролируются каким-либо монитором процессов, например супервизором, чтобы его можно было перезапустить, если какой-либо из процессов остановился. Мы запускаем воркер, запуская php artisan queue:work, поэтому эта команда — первое, что мы рассмотрим.

Команда queue:work определена внутри класса Illuminate\Queue\Console\WorkCommand, давайте посмотрим на метод handle() этого класса:

Первая часть просто проверяет, находится ли приложение в режиме обслуживания и был ли установлен флаг --once (это обработает одно задание и завершится), и если это так, процесс будет бездействовать в течение настроенного времени и завершится. Метод listendForEvents() регистрирует обработчики событий, связанных с заданием, и выводит вывод на консоль. Следующая часть определяет, из какого соединения рабочий процесс должен получать задания. Если указано имя соединения, он использует это соединение, в противном случае он будет использовать соединение по умолчанию. Точно так же, если очередь была предоставлена, она используется, в противном случае для выборки заданий будет использоваться очередь по умолчанию. Можно указать несколько очередей, разделяя их запятой (например, --queue=emails,default ), здесь имя очереди, которое идет первым, имеет более высокий приоритет. Мы увидим, как это работает чуть позже. Наконец, он запускает метод runWorker(), передавая в качестве аргументов имя соединения и имя очереди.

Обработка заданий переносится на класс Illuminate\Queue\Worker, в зависимости от того, был ли предоставлен флаг --once, метод runWorker() будет вызывать runNextJob() или daemon() для базового экземпляра рабочего процесса. runNextJob() берет одно задание из очереди, обрабатывает его и затем выходит. С другой стороны, метод daemon() запускает бесконечный цикл, который продолжает извлекать задания из очереди и обрабатывать их. Мы рассмотрим этот метод daemon() более подробно.

Методы supportsAsyncSignals() проверяют, загружено ли расширение pcntl, и настраивают прослушиватели событий для SIGTERM, SIGUSER2 и SIGCONT. Сигнал SIGTERM, вероятно, используется для корректного завершения рабочего процесса, а два других, по-видимому, используются для приостановки и продолжения рабочего процесса.

Первое, что происходит внутри бесконечного цикла, — это проверка, должна ли текущая итерация цикла продолжаться или нет. Метод daemonShouldRun() в основном проверяет, находится ли приложение в режиме обслуживания (который вы можете переопределить, передав флаг --force), приостановлен ли рабочий процесс и предотвращает ли какой-либо прослушиватель событий цикл или нет. Линия

$this->events->until(new Events\Looping($connectionName, $queue)) === false)

вызовет событие Events\Looping, и если какой-либо прослушиватель этого события вернет false, то цикл не будет продолжаться дальше. Поэтому, если вы хотите временно приостановить обработку заданий рабочим процессом, вы можете использовать для этого это событие.
Если условие возвращает значение true, оно вызывает метод pauseWorker(), который делает процесс sleep на заданную продолжительность и проверяет посмотрите, должно ли выполнение продолжаться или нет. Если рабочий процесс потребляет больше памяти, чем настроено, или вы установили флаг --stop-when-empty и в списке нет заданий для обработки, или вы выполнили команду queue:restart, то текущий процесс завершится. Еще одна интересная вещь заключается в том, что команда queue:restart на самом деле не перезапускает процесс, а просто завершает текущий процесс. У вас должен быть отдельный монитор процесса (супервизор), чтобы запустить резервное копирование рабочего процесса. Это работает следующим образом: когда процесс демона запускается, он извлекает время последнего перезапуска очереди из кеша и сохраняет его в локальной переменной. Теперь, если в какой-то момент значение в кеше отличается от значения в локальной переменной, рабочий процесс завершится. И все, что делает команда queue:restart, — это обновляет метку времени последнего перезапуска в кеше текущей меткой времени. Это приведет к выходу всех запущенных воркеров, и если у вас настроен супервизор, супервизор снова перезапустит этих воркеров.

В любом случае, в этот момент он вызовет метод getNextJob(), чтобы получить следующее задание из очереди для обработки. Прежде чем мы углубимся в детали метода getNextJob(), я хочу сначала обсудить, как реализуются тайм-ауты.

registerTimeoutHandler() зарегистрирует обработчик сигнала SIGALRM. Обработчик уничтожит текущий процесс, если возникнет тревога. В следующих строках настраивается аварийный сигнал, который будет срабатывать по истечении тайм-аута задания. Если вы установили тайм-аут в используемом классе заданий, в противном случае он будет использовать тайм-аут по умолчанию для рабочего. markJobAsFailedIfWillExceedMaxAttempt() приведет к сбою текущего задания, если оно превысило допустимое количество повторных попыток или было превышено время ожидания. Если задание не превысило максимальное количество попыток, завершится только рабочий процесс, и это задание будет снова выбрано другим рабочим процессом для повторной попытки. Что происходит, когда работа терпит неудачу, мы увидим позже. Теперь давайте рассмотрим, как задания извлекаются из очереди.

Помните, что для определения приоритета конкретной очереди вам нужно передать ее вот так --queue=high,low при запуске воркера? Задания из очереди high всегда будут обрабатываться до обработки заданий из очереди low. Вот как это происходит: после разнесения имени очереди по , имена очередей зацикливаются, и драйвер очереди пытается получить задание из первой доступной очереди с $connection->pop($queue). Это означает, что если в очереди с высоким приоритетом есть задания, задания из очередей с более низким приоритетом вообще не будут обрабатываться. Только после того, как очередь с высоким приоритетом опустеет, рабочий процесс будет получать задания из очередей с низким приоритетом. Я не уверен, каковы шансы, что это произойдет, но это то, что нужно иметь в виду.

Метод pop() реализован внутри класса Illuminate\Queue\RedisQueue. Первое, что он делает, — это перенос просроченных заданий из очереди delayed, и если вы настроили retry_after, а задание застряло на обработке дольше, чем это время, оно будет добавлено обратно в основную очередь для повторной обработки.

Помните, когда мы говорили об отправке отложенных заданий, о том, что отложенные задания добавляются в отсортированный набор со временем, когда оно должно быть доступно для обработки, в качестве оценки? Именно здесь эти задания возвращаются в основную очередь. Все задания, срок действия которых истек в настоящее время, извлекаются из очереди с задержкой и возвращаются в основную очередь. Если вы помните команду ZRANGEBYSCORE, просмотр Lua-скрипта прояснит для вас ситуацию. Извлеченные элементы также удаляются из предыдущей очереди с помощью команды ZREMRANGEBYSCORE. Об очереди reserved вы узнаете чуть позже.

После того, как все задания, которые могут быть добавлены в основную очередь, будут добавлены обратно в нее, теперь она будет извлекать следующее задание из очереди.

Чтобы убрать задание из очереди, запускается Lua-скрипт. В основном он выполняет три задачи:
1) Вызывает LPOP в основной список очереди.
2) Если LPOP возвращает задание, оно добавляется в очередь reserved. Если вы помните ранее, зарезервированные очереди также являются отсортированными множествами. Здесь срок действия зарезервированных заданий — это время после добавления продолжительностиretry_after к текущей метке времени. Как показано в предыдущем разделе, зарезервированные задания добавляются обратно в основную очередь после истечения срока их действия для повторной попытки. Каждый раз, когда задание добавляется в резервную очередь, счетчик попыток для этого задания увеличивается.
3) Элемент из списка notify удаляется.

До сих пор вы видели эту очередь notify повсюду, чтобы понять ее назначение, давайте посмотрим на следующий фрагмент из метода retrieveNextJob():

Laravel предоставляет параметр конфигурации block_for для очереди Redis из документации:

При использовании очереди Redis вы можете использовать параметр конфигурации block_for, чтобы указать, как долго драйвер должен ждать, пока задание станет доступным, прежде чем выполнить итерацию рабочего цикла и повторно опросить базу данных Redis.

Здесь в игру вступает очередь notify. Если вы указали опцию block_for в своей конфигурации, то воркер будет вызывать BLPOP в очереди notify, и этот вызов будет заблокирован на указанное время. Как мы видели во время диспетчеризации заданий, когда новое задание ставится в очередь, новый элемент также добавляется в очередь notify. Когда новый элемент добавляется в очередь notify, вызов BLPOP возвращается, и метод рекурсивно вызывает сам себя, чтобы выбрать новое задание из фактической очереди. Отлично, да?
Если из очереди не возвращается ни одно задание, вызовы метода просто возвращаются. В противном случае экземпляр Illuminate\Queue\Jobs\RedisJob создается и возвращается обратно в рабочий процесс. Теперь пришло время обработать задание.

Как только задание будет извлечено из очереди, рабочий демон вызовет метод runJob(), который, в свою очередь, вызовет метод process() и перехватит любое исключение или ошибку, возникшую во время обработки.

Сначала он вызовет событие перед заданием, а затем проверит, не превысило ли задание количество повторных попыток.

Во-первых, он извлекает параметр конфигурации максимального количества попыток, который настраивается либо в вашем классе заданий, либо во время запуска рабочего процесса. Вы также можете настроить продолжительность тайм-аута вместо максимального количества попыток, если продолжительность тайм-аута настроена так, что она имеет приоритет, и если текущее время не превысило продолжительность тайм-аута, считается, что оно подходит для обработки, независимо от того, сколько раз оно было предпринято ранее. В противном случае это приведет к сбою работы. Что происходит, когда задание терпит неудачу, мы рассмотрим это чуть позже, а сейчас просто знайте, что оно будет помечено как удаленное, а process() на предыдущем шаге проверит статус удаления и пропустит его обработку. Если все в порядке, то задание подходит для обработки и запускается вызовом $job->fire(). Прежде чем мы посмотрим, что происходит, когда задание запускается, позвольте мне сначала показать вам, что происходит, если внутри задания генерируется исключение.

Когда во время обработки задания возникает исключение, будь оно вызвано непосредственно из задания или откуда-то еще во время его обработки, оно сначала проверяет, достигнуто ли задание максимальное количество повторных попыток, и если это так, это приведет к сбою задания. Метод fail() можно найти в классе Illuminate\Queue\Jobs\Job.

Когда задание считается неудачным, это означает, что все доступные варианты повтора исчерпаны, и больше не будет предприниматься никаких дальнейших попыток обработать задание. Сбой задания включает в себя пометку его как неудавшегося и удаление. Метод $this->delete() переопределен в дочернем классе RedisJob, который вызывает метод deleteReserved для экземпляра RedisQueue. Как мы видели ранее, когда задание забирается из обработки, оно добавляется в зарезервированную очередь; теперь, когда он терпит неудачу, он полностью удаляется из очереди.

В качестве последнего шага обработки исключений, если задание может быть повторено позднее, оно снова помещается в очередь delayed. Вызов $job->release(), реализованный в RedisJob::release(), вызовет метод deleteAndRelease() в драйвере очереди RedisQueue..

Если вы настроили задержку для невыполненных заданий, задание будет доступно для обработки после этого периода задержки. В противном случае он будет доступен немедленно для повторной попытки на следующей итерации рабочего цикла. По существу, задание удаляется из очереди reserved и добавляется в очередь delayed после добавления длительности задержки в качестве срока действия.

Теперь, когда мы увидели, как обрабатываются сбои, осталось исследовать только то, как выполняется логика из класса задания. Этот процесс запускается методом $job->fire(). Этот метод определен в классеIlluminate\Queue\Jobs\Job.

Полезная нагрузка здесь — это необработанная полезная нагрузка, которая хранится в Redis. Если вы помните наше предыдущее обсуждение генерации полезной нагрузки во время диспетчеризации заданий, то вы знаете, что $payload['job'] был установлен в Illuminate\Queue\CallQueuedHandler@call, а $payload['data'] был массивом, содержащим полное имя класса задания и сериализованную версию экземпляра задания. Здесь начинается собственно обработка. Метод fire() просто создает экземпляр класса Illuminate\Queue\CallQueuedHandler и вызывает для него метод call(). Посмотрим, что там происходит.

Сначала он десериализует экземпляр задания, и если задание включает признак Illuminate\Queue\InteractsWithQueue, он установит экземпляр RedisJob в несериализованный экземпляр задания. Затем он вызовет метод dispatchNow() для файла Dispatcher. Теперь мы вернулись к Dispatcher. Этот же метод dispatchNow() вызывается, если вы отправили задание синхронно.

Этот метод в основном создает функцию обратного вызова, которая выполняет метод handle() для экземпляра класса задания и отправляет его по конвейеру. Конвейер Laravel выходит за рамки этого поста, но, по сути, он в конечном итоге вызывает обратный вызов, который, в свою очередь, выполняет метод handle. Таким образом, наше путешествие от отправки задания к его выполнению завершается. Если здесь возникает какое-либо исключение, оно обрабатывается описанной ранее обработкой исключений. И если задание выполняется успешно, элемент управления теперь возвращается к методу daemon() рабочего процесса, и он снова повторяет итерацию, пытаясь найти новое задание для обработки.

Напутствие

Надеюсь, вам понравилось это путешествие со мной. И я искренне надеюсь, что смог объяснить все эти детали так хорошо, как хотел. Если что-то непонятно, вы можете спросить в комментариях, или вы можете засучить рукава и покопаться в исходном коде, как это сделал я. В конце концов, это определенно полезное путешествие. Чтобы увидеть команды Redis, выполняемые системой очередей, вы можете запустить redis-cli monitor, и вы сможете увидеть все команды, которые обрабатываются сервером Redis. Дайте мне знать, что вы думаете об этой статье в разделе комментариев, и если у вас есть какие-либо отзывы, это тоже приветствуется. А пока, сайонара.