Многопроцессорная обработка Python. Обработать исключение в родительском процессе и сделать так, чтобы все дочерние элементы изящно умирали

У меня есть следующий код.

Здесь используется модуль Python под названием decorator .

from multiprocessing import Pool
from random import randint
import traceback
import decorator
import time


def test_retry(number_of_retry_attempts=1, **kwargs):
    timeout = kwargs.get('timeout', 2.0) # seconds
    @decorator.decorator
    def tryIt(func, *fargs, **fkwargs):
        for _ in xrange(number_of_retry_attempts):
            try: return func(*fargs, **fkwargs)
            except:
                tb = traceback.format_exc()
                if timeout is not None:
                    time.sleep(timeout)
                print 'Catching exception %s. Attempting retry: '%(tb)

        raise
    return tryIt

Модуль декоратора помогает мне украшать функции вызова хранилища данных. Поэтому мне не нужно заботиться об обрыве соединения и различных проблемах, связанных с соединением, и позволять мне сбросить соединение и повторить попытку после некоторого тайм-аута. Я украшаю все свои функции, которые выполняют чтение хранилища данных, этим методом, поэтому я получаю повторную попытку бесплатно.

У меня есть следующие методы.

def process_generator(data):
    #Process the generated data


def generator():
    data = data_warhouse_fetch_method()#This is the actual method which needs retry
    yield data

@test_retry(number_of_retry_attempts=2,timeout=1.0)
def data_warhouse_fetch_method():
    #Fetch the data from data-warehouse
    pass

Я пытаюсь обработать свой код, используя многопроцессорный модуль, подобный этому.

try:
    pool = Pool(processes=2)
    result = pool.imap_unordered(process_generator,generator())
except Exception as exception:
    print 'Do some post processing stuff'
    tb = traceback.format_exc()
    print tb 

Все нормально, когда все идет успешно. Также все нормально, когда он исправляет себя в течение количества повторных попыток. Но как только количество повторов превышается, я поднимаю исключение в методе test_retry, которое не попадает в основной процесс. Процесс умирает, а процессы, разветвленные основным процессом, остаются сиротами. Может быть, я делаю что-то неправильно здесь. Прошу помощи в решении следующей проблемы. Распространить исключение на родительский процесс, чтобы я мог обработать исключение и сделать так, чтобы мои дети умерли изящно. Также я хочу знать, как сообщить дочерним процессам, чтобы они изящно умирали. Заранее спасибо за помощь .

Изменить: добавлен дополнительный код для объяснения.

def test_retry(number_of_retry_attempts=1, **kwargs):
    timeout = kwargs.get('timeout', 2.0) # seconds
    @decorator.decorator
    def tryIt(func, *fargs, **fkwargs):
        for _ in xrange(number_of_retry_attempts):
            try: return func(*fargs, **fkwargs)
            except:
                tb = traceback.format_exc()
                if timeout is not None:
                    time.sleep(timeout)
                print 'Catching exception %s. Attempting retry: '%(tb)
        raise
    return tryIt

@test_retry(number_of_retry_attempts=2,timeout=1.0)
def bad_method():
    sample_list =[]
    return sample_list[0] #This will result in an exception


def process_generator(number):
    if isinstance(number,int):
        return number+1
    else:
        raise

def generator():
    for i in range(20):
        if i%10 == 0 :
         yield bad_method()
        else:
            yield i

try:
    pool = Pool(processes=2)
    result = pool.imap_unordered(process_generator,generator())
    pool.close()
    #pool.join()
    for r in result:
        print r
except Exception, e: #Hoping the generator will catch the exception. But not .
    print 'got exception: %r, terminating the pool' % (e,)
    pool.terminate()
    print 'pool is terminated'
finally:
    print 'joining pool processes'
    pool.join()
    print 'join complete'
print 'the end'

Фактическая проблема сводится к тому, что если генератор выдает исключение, я не могу поймать исключение, выброшенное генератором, в предложении exclude, которое обернуто вокруг метода pool.imap_unordered(). Итак, после того, как исключение выброшено, основной процесс застревает, а дочерний процесс ждет вечно. Не уверен, что я здесь делаю неправильно.


person Senthil    schedule 14.11.2014    source источник
comment
Как это выглядит, когда процесс умирает. Любое исключение? Он замерзает? Вы пытались вернуть None вместо исключения?   -  person User    schedule 14.11.2014
comment
Он генерирует исходное исключение, и родительский процесс умирает, оставляя дочерний процесс разветвленным как сироту.   -  person Senthil    schedule 14.11.2014
comment
Что если добавить finally: pool.join()   -  person User    schedule 14.11.2014
comment
То же, что и раньше, после добавления finally:pool.join . Родительский процесс умирает, а дочерний процесс остается сиротой. Единственная другая вещь, которая отличается, это то, что у меня есть следующая трассировка .Traceback (most recent call last): File "test_exception.py", line 48, in <module> pool.join() File "/Users/senthilsrinivasan/.localpython/lib/python2.7/multiprocessing/pool.py", line 456, in join assert self._state in (CLOSE, TERMINATE)   -  person Senthil    schedule 14.11.2014
comment
Затем вам нужно сначала позвонить pool.close(). См. этот пример.   -  person User    schedule 15.11.2014
comment
Я попробовал это и обновил свой первоначальный вопрос более конкретным примером. Процесс, отвечающий за потребление генератора, выдает исключение, но я не смог поймать это исключение в основном процессе. Таким образом, когда возникает исключение, процесс не сигнализирует об окончании работы дочернему процессу, который застрял на неопределенный срок. ЕСЛИ есть способ поймать это исходное исключение в основном процессе?   -  person Senthil    schedule 16.11.2014
comment
Я не могу запустить код: @decorator.decorator NameError: глобальное имя «декоратор» не определено   -  person User    schedule 20.11.2014
comment
decorator — это внешний пакет Python, доступный через easy_install. Для его установки можно использовать декоратор easy_install. Спасибо .   -  person Senthil    schedule 20.11.2014


Ответы (1)


Я не полностью понимаю код, который был опубликован здесь, поскольку я не эксперт. Тем более, что этому вопросу почти год. Но у меня было такое же требование, как описано в теме. И мне удалось найти решение:

import multiprocessing
import time


def dummy(flag):
    try:
        if flag:
            print('Sleeping for 2 secs')
            time.sleep(2)  # So that it can be terminated
        else:
            raise Exception('Exception from ', flag) # To simulate termination
        return flag  # To check that the sleeping thread never returns this
    except Exception as e:
        print('Exception inside dummy', e)
        raise e
    finally:
        print('Entered finally', flag)


if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    args_list = [(1,), (0,)]
    # call dummy for each tuple inside args_list. 
    # Use error_callback to terminate the pool
    results = pool.starmap_async(dummy, args_list, 
                                error_callback=lambda e, mp_pool=pool: mp_pool.terminate())
    pool.close()
    pool.join()
    try:
        # Try to see the results.
        # If there was an exception in any process, results.get() throws exception
        for result in results.get():
            # Never executed cause of the exception
            print('Printing result ', result)  
    except Exception as e:
        print('Exception inside main', e)

    print('Reached the end')

Это дает следующий результат:

Sleeping for 2 secs
Exception inside dummy ('Exception from ', 0)
Entered finally 0
Exception inside main ('Exception from ', 0)
Reached the end

Это практически первый раз, когда я отвечаю на вопрос, поэтому заранее извиняюсь, если я нарушил какие-либо правила или допустил какие-либо ошибки.

Я безуспешно пытался сделать следующее:

  1. Используйте apply_async. Но это просто зависло основной процесс после того, как было выбрано исключение
  2. Попробуйте убить процессы и дочерние процессы, используя pid в error_callback
  3. Используйте multiprocessing.event для отслеживания исключений и проверяйте их во всех процессах после каждого шага, прежде чем продолжить. Не очень хороший подход, но и он не сработал: «Объекты условий должны быть разделены между процессами только посредством наследования».

Я искренне хотел бы, чтобы не было так сложно завершить все процессы в одном пуле, если один из процессов выдал исключение.

person Suhas K    schedule 18.12.2015