Обработка групп результатов с Vertx - как координировать действия?

У меня есть система обработки заданий, в которой каждое задание содержит тысячи отдельных задач, для выполнения которых требуются разные стратегии. Индивидуальные задачи составляют всю работу. Если все задачи были выполнены, задание помечается как успешно завершенное, и выполняются другие шаги, если какая-либо из задач не выполняется, задание должно быть помечено как сбойное и предпринимаются другие шаги, если время ожидания задания истекает, задание должно быть отмечено как не удалось и другие шаги предпринимаются.

Как только все результаты для задания будут получены, можно будет выбрать следующее задание. Следующее задание не следует загружать, пока оно обрабатывается.

Вот как выглядит поток:

введите здесь описание изображения

Вертикаль опроса заданий публикует задание в шине событий, а Вертикаль обработки заданий публикует каждую задачу в шине событий. Когда стратегия работы завершается, она публикует результат задачи в шине событий.

Проблема в том, что я не знаю правильного способа определить, когда все задачи выполнены в этой модели. Все статьи не имеют состояния, Вертикаль обработки заданий не ожидает какого-либо будущего, и даже если Вертикаль результатов задания была с отслеживанием состояния, она не знает, сколько результатов следует ожидать.

Единственный способ, которым я могу это сделать, - это иметь глобальный объект с отслеживанием состояния. Но я не думаю, что это хороший дизайн.

введите здесь описание изображения

Кроме того, мне нужно знать, когда для задания истекло время. То есть он работает дольше, чем должен, и мне нужно признать его неудачным, зарегистрировать его и двигаться дальше.

Я мог бы сделать это с помощью глобального состояния, но опять же, я не думаю, что это правильное решение.

Имеет ли смысл этот узор по вертикали для того, что я пытаюсь сделать?


person micah    schedule 18.09.2019    source источник


Ответы (1)


Во-первых, позвольте мне ответить на ваши вопросы. Затем я постараюсь объяснить, в чем проблемы этой конструкции.

Проблема в том, что я не знаю, как правильно определить, когда все задачи выполнены в этой модели. Все статьи не имеют состояния, Вертикаль обработки заданий не ожидает никаких фьючерсов, и даже если статья результатов задания была с сохранением состояния, она не знает, сколько результатов следует ожидать.

Решением может стать подсчет ссылок по вертикали. Каждый рабочий должен выдавать start message на шине событий с jobId при запуске и end message с jobId по завершении. Даже если у вас есть разветвление (это те случаи, когда вы не знаете, сколько есть рабочих), счетчик по вертикали будет знать это. На вашей диаграмме "Вертикаль постобработки вакансии" является хорошим кандидатом для этого. Он может поддерживать счетчик и только когда он достигает нуля, он должен начать следующее задание. Это также помогает избежать фактического совместного использования некоторой ссылки на память.

Кроме того, мне нужно знать, когда истекло время ожидания задания. То есть он работает дольше, чем должен, и мне нужно признать его неудачным, зарегистрировать его и двигаться дальше.

В той же статье вы можете запускать таймер каждый раз, когда получаете новый start message. Если вы получили end message, отключите таймер. В противном случае отмените текущее задание и начните заново.

Теперь это решение будет работать, но у конструкции есть два основных недостатка. Во-первых, кажется, что вы сохраняете весь свой поток в памяти. Если ваше приложение вылетает, весь прогресс теряется, и неясно, как вы его записываете. Возможно, опрос Jobs таблицы в БД действительно был бы лучше, так как ваше задание в любом случае выполняется последовательно.

Второй момент заключается в том, что все эти тайм-ауты и подсчет ссылок - это самодельная реализация структурированного параллелизма. Возможно, вам стоит взглянуть на что-то вроде сопрограмм Kotlin для этого, поскольку они решат многие ваши проблемы за вас.

person Alexey Soshin    schedule 19.09.2019
comment
Все это имеет смысл. Я понимаю вашу точку зрения по поводу отказоустойчивости. У нас есть очередь, но, к сожалению, мы не можем воспроизвести задачи после того, как они были выполнены. Таким образом, если задание частично завершается до того, как завершится сбоем, нам просто нужно пометить задание как неудавшееся и продолжить. Если бы мы могли воспроизвести задания, мы бы не удаляли сообщение из очереди и не пытались бы повторить его после тайм-аута видимости. - person micah; 19.09.2019