Приоритет выполнения одних рабочих процессов над другими

Я использовал структуру потока для amazon swf, и я хочу иметь возможность запускать выполнение приоритетных рабочих процессов и выполнение обычных рабочих процессов. Если есть приоритетные задачи, тогда действия должны поднимать приоритетные задачи перед обычными приоритетными задачами. Как лучше всего этого добиться?

Я думаю, что следующее может сработать, но мне интересно, есть ли лучший / рекомендуемый подход.

  1. Я определю два Activity Workers и два списка действий для действия. Один список приоритетов и один обычный список. Каждый рабочий будет использовать один и тот же класс активности.
  2. Оба воркера будут запущены на одном хосте (экземпляр ec2).
  3. В рабочем процессе я определю два метода: startNormalWorkflow и startHighWorkflow. В методе startHighWorkflow я могу использовать ActivitySchedulingOptions, чтобы поместить задачу в список с высоким приоритетом.

Проблема с этим подходом заключается в том, что нет гарантии, что задача с высоким приоритетом будет запланирована раньше обычных задач.


person Nithin    schedule 12.09.2013    source источник


Ответы (2)


Это хороший вопрос, я некоторое время почесал в затылке.

Конечно, есть несколько способов снять шкуру с этой кошки, и существует ряд действенных решений. Я сосредоточился здесь на простейшем из возможных вариантов, а именно на выполнении задач в порядке приоритета в рамках одного рабочего процесса.

Сценарий выглядит следующим образом: я определяю одного рабочего процесса, обслуживающего два списка задач, default_tasks и urgent_tasks, с тривиальной логикой:

  1. Если в списке urgent_tasks есть незавершенные задачи, выберите одну из них,
  2. В противном случае выберите задачу из default_tasks
  3. Выполнить любую выбранную задачу.

Вопрос в том, как проверить, не отложены ли какие-либо высокоприоритетные задачи? На помощь приходит API CountPendingActivityTasks!

Я знаю, что вы используете Flow для разработки. Мой пример написан с использованием boto.swf.layer2, поскольку Python намного проще создавать прототипы, но идея остается той же и может быть расширена до более сложного сценария с выполнением рабочего процесса с высоким и низким приоритетом.

Итак, чтобы выполнить вышеуказанное с помощью boto.swf, выполните следующие действия:

Экспорт учетных данных в среду

$ export AWS_ACCESS_KEY_ID=your access key
$ export AWS_SECRET_ACCESS_KEY= your secret key 

Получите фрагменты кода

Для удобства его можно форкнуть с github:

$ git clone [email protected]:oozie/stackoverflow.git
$ cd stackoverflow/amazon-swf/priority_tasks/

Чтобы запустить домен и рабочий процесс:

# domain_setup.py 
import boto.swf.layer2 as swf

DOMAIN = 'stackoverflow'
VERSION = '1.0'

swf.Domain(name=DOMAIN).register()
swf.ActivityType(domain=DOMAIN, name='SomeActivity', version=VERSION, task_list='default_tasks').register()
swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register()

Реализация решения:

# decider.py
import boto.swf.layer2 as swf

DOMAIN = 'stackoverflow'
ACTIVITY = 'SomeActivity'
VERSION = '1.0'

class MyWorkflowDecider(swf.Decider):

    domain = DOMAIN
    task_list = 'default_tasks'
    version = VERSION

    def run(self):
        history = self.poll()
        print history
        if 'events' in history:
            # Get a list of non-decision events to see what event came in last.
            workflow_events = [e for e in history['events']
                               if not e['eventType'].startswith('Decision')]

            decisions = swf.Layer1Decisions()

            last_event = workflow_events[-1]
            last_event_type = last_event['eventType']

            if last_event_type == 'WorkflowExecutionStarted':
                # At the start, get the worker to fetch the first assignment.
                decisions.schedule_activity_task(ACTIVITY+'1', ACTIVITY, VERSION, task_list='default_tasks')
                decisions.schedule_activity_task(ACTIVITY+'2', ACTIVITY, VERSION, task_list='urgent_tasks')
                decisions.schedule_activity_task(ACTIVITY+'3', ACTIVITY, VERSION, task_list='default_tasks')
                decisions.schedule_activity_task(ACTIVITY+'4', ACTIVITY, VERSION, task_list='urgent_tasks')
                decisions.schedule_activity_task(ACTIVITY+'5', ACTIVITY, VERSION, task_list='default_tasks')
            elif last_event_type == 'ActivityTaskCompleted':
                # Complete workflow execution after 5 completed activities.
                closed_activity_count = sum(1 for wf_event in workflow_events if wf_event.get('eventType') == 'ActivityTaskCompleted')
                if closed_activity_count == 5:
                    decisions.complete_workflow_execution()

            self.complete(decisions=decisions)
            return True

Приоритет реализации воркера:

# worker.py
import boto.swf.layer2 as swf

DOMAIN = 'stackoverflow'
VERSION = '1.0'

class PrioritizingWorker(swf.ActivityWorker):

    domain = DOMAIN
    version = VERSION

    def run(self):

        urgent_task_count = swf.Domain(name=DOMAIN).count_pending_activity_tasks('urgent_tasks').get('count', 0)
        if urgent_task_count > 0:
            self.task_list = 'urgent_tasks'
        else:
            self.task_list = 'default_tasks'
        activity_task = self.poll()

        if 'activityId' in activity_task:
            print urgent_task_count, 'urgent tasks in the queue. Executing ' + activity_task.get('activityId')
            self.complete()
            return True

Запустите рабочий процесс из трех экземпляров интерактивной оболочки Python

Запускаем решающий:

$ python -i decider.py
>>> while MyWorkflowDecider().run(): pass
... 

Начать выполнение:

$ python -i decider.py 
>>> swf.WorkflowType(domain='stackoverflow', name='MyWorkflow', version='1.0', task_list='default_tasks').start()

Наконец, отключите воркера и наблюдайте за выполнением задач:

$ python -i worker.py 
>>> while PrioritizingWorker().run(): pass
... 
2 urgent tasks in the queue. Executing SomeActivity2
1 urgent tasks in the queue. Executing SomeActivity4
0 urgent tasks in the queue. Executing SomeActivity5
0 urgent tasks in the queue. Executing SomeActivity1
0 urgent tasks in the queue. Executing SomeActivity3
person oozie    schedule 13.09.2013
comment
спасибо за такой подробный ответ. Мы решили разместить рабочих на отдельных экземплярах, чтобы мы могли автоматически масштабировать их независимо. - person Nithin; 16.09.2013
comment
Спасибо. Удовольствие мое. - person oozie; 16.09.2013

Оказывается, использование отдельного списка задач, который нужно сначала проверить, не работает.

Есть пара проблем.

Во-первых, API-интерфейс count не обновляется надежно. Таким образом, вы можете получить 0 задач, даже если в очереди есть срочные задачи.

Во-вторых, вызов, который опрашивает задачи, зависает, если нет доступных задач. Таким образом, когда вы проводите опрос на предмет несрочных задач, он будет «задерживаться» либо на 2 минуты, либо до тех пор, пока у вас не появится несрочное задание.

Так что это может вызвать всевозможные проблемы в вашем рабочем процессе.

Чтобы это работало, SWF должен реализовать API опроса, который мог бы возвращать первую задачу из списка списков задач. Тогда было бы намного проще.

person user2981278    schedule 10.11.2014
comment
Спасибо за комментарий. Первый пункт кажется проблемой с сервисом, которую надо исправить. Люди из Amazon отлично реагируют на отчеты пользователей, я рекомендую связаться с ними, например через форум AWS. По второму пункту - вы правы - приведенный выше пример слишком упрощен для объяснения. Эту задержку можно уменьшить, также протестировав «default_tasks» на предмет невыполненных count_pending_activity_tasks. - person oozie; 06.04.2015