У меня есть следующий код.
Здесь используется модуль Python под названием decorator .
from multiprocessing import Pool
from random import randint
import traceback
import decorator
import time
def test_retry(number_of_retry_attempts=1, **kwargs):
timeout = kwargs.get('timeout', 2.0) # seconds
@decorator.decorator
def tryIt(func, *fargs, **fkwargs):
for _ in xrange(number_of_retry_attempts):
try: return func(*fargs, **fkwargs)
except:
tb = traceback.format_exc()
if timeout is not None:
time.sleep(timeout)
print 'Catching exception %s. Attempting retry: '%(tb)
raise
return tryIt
Модуль декоратора помогает мне украшать функции вызова хранилища данных. Поэтому мне не нужно заботиться об обрыве соединения и различных проблемах, связанных с соединением, и позволять мне сбросить соединение и повторить попытку после некоторого тайм-аута. Я украшаю все свои функции, которые выполняют чтение хранилища данных, этим методом, поэтому я получаю повторную попытку бесплатно.
У меня есть следующие методы.
def process_generator(data):
#Process the generated data
def generator():
data = data_warhouse_fetch_method()#This is the actual method which needs retry
yield data
@test_retry(number_of_retry_attempts=2,timeout=1.0)
def data_warhouse_fetch_method():
#Fetch the data from data-warehouse
pass
Я пытаюсь обработать свой код, используя многопроцессорный модуль, подобный этому.
try:
pool = Pool(processes=2)
result = pool.imap_unordered(process_generator,generator())
except Exception as exception:
print 'Do some post processing stuff'
tb = traceback.format_exc()
print tb
Все нормально, когда все идет успешно. Также все нормально, когда он исправляет себя в течение количества повторных попыток. Но как только количество повторов превышается, я поднимаю исключение в методе test_retry, которое не попадает в основной процесс. Процесс умирает, а процессы, разветвленные основным процессом, остаются сиротами. Может быть, я делаю что-то неправильно здесь. Прошу помощи в решении следующей проблемы. Распространить исключение на родительский процесс, чтобы я мог обработать исключение и сделать так, чтобы мои дети умерли изящно. Также я хочу знать, как сообщить дочерним процессам, чтобы они изящно умирали. Заранее спасибо за помощь .
Изменить: добавлен дополнительный код для объяснения.
def test_retry(number_of_retry_attempts=1, **kwargs):
timeout = kwargs.get('timeout', 2.0) # seconds
@decorator.decorator
def tryIt(func, *fargs, **fkwargs):
for _ in xrange(number_of_retry_attempts):
try: return func(*fargs, **fkwargs)
except:
tb = traceback.format_exc()
if timeout is not None:
time.sleep(timeout)
print 'Catching exception %s. Attempting retry: '%(tb)
raise
return tryIt
@test_retry(number_of_retry_attempts=2,timeout=1.0)
def bad_method():
sample_list =[]
return sample_list[0] #This will result in an exception
def process_generator(number):
if isinstance(number,int):
return number+1
else:
raise
def generator():
for i in range(20):
if i%10 == 0 :
yield bad_method()
else:
yield i
try:
pool = Pool(processes=2)
result = pool.imap_unordered(process_generator,generator())
pool.close()
#pool.join()
for r in result:
print r
except Exception, e: #Hoping the generator will catch the exception. But not .
print 'got exception: %r, terminating the pool' % (e,)
pool.terminate()
print 'pool is terminated'
finally:
print 'joining pool processes'
pool.join()
print 'join complete'
print 'the end'
Фактическая проблема сводится к тому, что если генератор выдает исключение, я не могу поймать исключение, выброшенное генератором, в предложении exclude, которое обернуто вокруг метода pool.imap_unordered(). Итак, после того, как исключение выброшено, основной процесс застревает, а дочерний процесс ждет вечно. Не уверен, что я здесь делаю неправильно.
None
вместо исключения? - person User   schedule 14.11.2014finally: pool.join()
- person User   schedule 14.11.2014Traceback (most recent call last): File "test_exception.py", line 48, in <module> pool.join() File "/Users/senthilsrinivasan/.localpython/lib/python2.7/multiprocessing/pool.py", line 456, in join assert self._state in (CLOSE, TERMINATE)
- person Senthil   schedule 14.11.2014pool.close()
. См. этот пример. - person User   schedule 15.11.2014