celery + eventlet = 100% загрузка ЦП

Мы используем celery для получения данных о рейсах от разных туристических агентств, каждый запрос занимает ~ 20-30 секунд (большинство агентств требуют последовательности запросов - авторизоваться, отправить запрос, опросить результаты).

Обычная задача сельдерея выглядит так:

from eventlet.green import urllib2, time 
def get_results(attr, **kwargs): 
    search, provider, minprice = attr 
    data = XXX # prepared data 
    host = urljoin(MAIN_URL, "RPCService/Flights_SearchStart") 
    req = urllib2.Request(host, data, {'Content-Type': 'text/xml'}) 
    try: 
        response_stream = urllib2.urlopen(req) 
    except urllib2.URLError as e: 
        return [search, None] 
    response = response_stream.read() 
    rsp_host = urljoin(MAIN_URL, "RPCService/FlightSearchResults_Get") 
    rsp_req = urllib2.Request(rsp_host, response, {'Content-Type': 
'text/xml'}) 
    ready = False 
    sleeptime = 1 
    rsp_response = '' 
    while not ready: 
        time.sleep(sleeptime) 
        try: 
            rsp_response_stream = urllib2.urlopen(rsp_req) 
        except urllib2.URLError as e: 
            log.error('go2see: results fetch failed for %s IOError %s'% 
(search.id, str(e))) 
        else: 
            rsp_response = rsp_response_stream.read() 
            try: 
                rsp = parseString(rsp_response) 
            except ExpatError as e: 
                return [search, None] 
            else: 
                ready = rsp.getElementsByTagName('SearchResultEx') 
[0].getElementsByTagName('IsReady')[0].firstChild.data 
                ready = (ready == 'true') 
        sleeptime += 1 
        if sleeptime > 10: 
            return [search, None] 
    hash = "%032x" % random.getrandbits(128) 
    open(RESULT_TMP_FOLDER+hash, 'w+').write(rsp_response) 
   # call to parser 
    parse_agent_results.apply_async(queue='parsers', args=[__name__, 
search, provider, hash]) 

Эти задачи выполняются в пуле событий с параллелизмом 300, prefetch_multiplier = 1, broker_limit = 300. Когда ~100-200 задач выбираются из очереди - загрузка ЦП возрастает до 100% (используется все ядро ​​ЦП), а выборка задач из очереди выполняется с задержками.

Не могли бы вы указать на возможные проблемы - блокировка операций (eventlet ALARM DETECTOR не дает исключений), неправильная архитектура или что-то еще.


person Andrew    schedule 14.03.2012    source источник
comment
Я изучал сообщения о высокой загрузке ЦП, которая возникает только после обновления до версии 2.5.x. Я еще не смог воспроизвести, так как не получил ни одного примера. Но, может быть, это поможет, я постараюсь и отчитаюсь. Спасибо!   -  person asksol    schedule 04.04.2012


Ответы (2)


Проблема возникает, если вы отправляете 200 запросов к серверу, ответы могут быть задержаны, и поэтому urllib.urlopen будет зависать.

Еще одна вещь, которую я заметил: если возникает URLError, программа остается в цикле while до тех пор, пока время сна не превысит 10. Таким образом, ошибка URLError позволит этому сценарию спать в течение 55 секунд (1+2+3.. и т. д.)

person Willian    schedule 14.03.2012
comment
urllib.urlopen - должна быть проблема - основная цель, почему я использовал eventlet, - избежать зависаний. вторая проблема, о которой вы упомянули, не реальна, это просто из-за упрощения кода перед публикацией - person Andrew; 14.03.2012

Простите за поздний ответ.

В такой ситуации я бы сначала попробовал полностью отключить Eventlet как в Celery, так и в вашем коде, использовать модель процесса или потока ОС. 300 потоков или даже процессов — это не такая большая нагрузка для планировщика ОС (хотя вам может не хватать памяти для запуска многих процессов). Так что я бы попробовал и посмотрел, сильно ли упадет нагрузка на процессор. Если это не так, проблема в вашем коде, и Eventlet не может волшебным образом ее исправить. Однако, если он упадет, нам нужно будет изучить проблему более подробно.

Если ошибка не устранена, пожалуйста, сообщите об этом одним из следующих способов:

person temoto    schedule 09.01.2013