Работая над недавним проектом, я понял, что тяжелые процессы для Python, такие как удаление отходов, можно упростить, если использовать многопроцессорную библиотеку Python. Документация и сообщество, занимающееся многопроцессорной обработкой, довольно скудны, поэтому я хотел поделиться некоторыми своими знаниями на примере проекта по отказу от «PokéAPI. Ниже я написал немного кода, который извлекает всех доступных покемонов, учитывая ограничение API в 100 вызовов за 60 секунд. Вы увидите, что итерация идет довольно медленно, так как API возвращает 964 покемона.

Перед многопроцессорной обработкой

Я просто создаю три вызова get_number_pokemon, get_pokemon и get_all_pokemon. Первый, get_number_pokemon, просто возвращает все URL-адреса из API для следующего процесса, get_all_pokemon, для перебора URL-адресов и объединения информации с помощью get_pokemon для получения данных ответа каждого URL-адреса.

import requests as req
import timeit
import time
import pandas as pd
from IPython.display import Image, HTML
import random
from tqdm import tqdm
from ratelimit import limits, sleep_and_retry
## Rate limit to help with overcalling
## pokemon api is 100 calls per 60 seconds max
@sleep_and_retry
@limits(calls=100, period=60)
def call_api(url):
    response = req.get(url)
    if response.status_code == 404:
        return 'Not Found'
    if response.status_code != 200:
        print('here', status_code, url)
        raise Exception('API response: {}'.format(response.status_code))
    return response

API_POKEMON = 'https://pokeapi.co/api/v2/pokemon/{pokemon}'
def get_number_pokemon():
    res = req.get(API_POKEMON.format(pokemon=''))
    number_pokemon = res.json()['count']
    res_url = call_api(API_POKEMON.format(pokemon='?offset=0&limit={limit}'.format(limit=str(number_pokemon))))
    pokemon_links_values = [link['url'] for link in res_url.json()['results']]
    return pokemon_links_values
def get_pokemon(link=''):
    
    info = None
    resolved = False
    
    try:
        while not resolved:
            
            res = None
            tooManyCalls = False
            try:
                res = call_api(link)
                if res == 'Not Found':
                    resolved = True
                    break
            except Exception as e:
                print(e)
                if e == 'too many calls':
                    tooManyCalls =True
            if tooManyCalls:
                time.sleep(60)
                    
            elif res.status_code < 300:
                pokemon_info = res.json()
                info = {
                    'Image' : pokemon_info['sprites']['front_default'],
                    'id' : pokemon_info['id'],
                    'name' : pokemon_info['name'],
                    'height' : pokemon_info['height'],
                    'base_experience' : pokemon_info['base_experience'],
                    'weight' : pokemon_info['weight'],
                    'species' : pokemon_info['species']['name']
                }
                resolved = True
            elif res.status_code == 429:
                time.sleep(60)
            else:
                sleep_val = random.randint(1,10)
                time.sleep(sleep_val)
                
    except Exception as e:
        print(e)
        return info
    finally:
        return info
            
def get_all_pokemon(links_pokemon=None):
    
    
    list_pokemon = []
    for link in tqdm(links_pokemon):
        
        pokemon = get_pokemon(link)
        if pokemon != None:
            list_pokemon.append(pokemon)
        time.sleep(0.3)
        
            
    pd.set_option('display.max_colwidth', None)
    df_pokemon = pd.DataFrame(list_pokemon)
      
    return df_pokemon
    
def image_formatter(im):
    return f'<img src="{im}">'
def main_pokemon_run():
    links_pokemon = get_number_pokemon()
    df_pokemon = get_all_pokemon(links_pokemon=links_pokemon)
    
    df_pokemon.sort_values(['id'],inplace=True)
    return df_pokemon, HTML(df_pokemon.iloc[0:4].to_html(formatters={'Image': image_formatter}, escape=False))

Как видно из приведенного ниже ответа, вызов API занял примерно 9 минут 30 секунд со скоростью 1,69 итерации в секунду. Мы можем значительно улучшить это, добавив многопроцессорность.

С многопроцессорностью

Теперь с многопроцессорностью мы можем разделить функцию get_all_pokemon на функцию многопроцессорного пула. Мы используем встроенную многопроцессорную функцию cpu_count(), чтобы определить необходимое количество рабочих. Поскольку мы хотим сделать это как можно быстрее, использование полного cpu_count - 1 обеспечит наиболее оптимальную работу программы, не занимая все наши CPUs. max(cpu_count() -1, 1) гарантирует, что эта функция никогда не будет равна 0. Затем я использую встроенный в многопроцессорную обработку инструмент Manager, чтобы совместно использовать глобальную память наших возвращенных покемонов.

Хотя вы можете использовать возвращаемое значение многопроцессорной карты, использование менеджера — хороший способ использовать многопроцессорность для других более сложных задач. Ознакомьтесь с документацией по доступным параметрам многопроцессорного менеджера.

Типы менеджеров. Еще один необходимый шаг — создать частичную функцию. Поскольку функция pool.imap передает массив для итерации, нам нужно добавить наши параметры более конкретно в нашу функцию get_pokemon_multiprocess, прежде чем мы будем ее итерировать. Библиотека functools.partial позволяет нам сделать это, создав частичную функцию для отправки на карту пула. В этом примере я использую tqdm, чтобы увидеть прогресс пула.

Я также показываю в коде ту же функцию без него. Теперь, когда пул запущен, нам нужно убедиться, что он и закрыт, и присоединен, как вы увидите, я делаю это сразу после и в блоке finally. Это делается для проверки того, что даже в случае ошибки рабочие процессы закрываются и завершаются.

Функция объединения необходима для объединения процессов после объединения в пул. Это также подтверждает, что менеджер полностью завершил привязку.

import requests as req
import timeit
import time
import pandas as pd
from IPython.display import Image, HTML
import random
from tqdm import tqdm
from ratelimit import limits, sleep_and_retry
from multiprocessing import Pool, Manager, cpu_count
from functools import partial

API_POKEMON = 'https://pokeapi.co/api/v2/pokemon/{pokemon}'
#  To see how it ran
# def infoDebugger(title):
#     print(title)
#     print('module name:', __name__)
#     if hasattr(os, 'getppid'):
#         print('parent process:', os.getppid())
#     print('process id:', os.getpid())

@sleep_and_retry
@limits(calls=100, period=60)
def call_api(url):
    response = req.get(url)
    
    if response.status_code == 404:
        return 'Not Found'
    if response.status_code != 200:
        raise Exception('API response: {}'.format(response.status_code))
    return response

# https://docs.python.org/2/library/multiprocessing.html
def get_number_pokemon():
    res = req.get(API_POKEMON.format(pokemon=''))
    number_pokemon = res.json()['count']
    res_url = call_api(API_POKEMON.format(pokemon='?offset=0&limit={limit}'.format(limit=str(number_pokemon))))
    pokemon_links_values = [link['url'] for link in res_url.json()['results']]
    return pokemon_links_values
def get_pokemon_multiprocess(listManager=None, links_pokemon=None, process=0):
#     print('Called Pokemon', process)
    link = links_pokemon[process]
    info = None
    resolved = False
#     print(link)
    
    try:
        while not resolved:
              
            res = None
            tooManyCalls = False
            
            try:
                res = call_api(link)
                if res == 'Not Found':
                    resolved = True
                    break
            except Exception as e:
                print(e)
                if e == 'too many calls':
                    tooManyCalls =True
                    
            if tooManyCalls:
                time.sleep(60)
                
            elif res.status_code < 300:
                pokemon_info = res.json()
                info = {
                    'Image' : pokemon_info['sprites']['front_default'],
                    'id' :  pokemon_info['id'],
                    'name' : pokemon_info['name'],
                    'height' : pokemon_info['height'],
                    'base_experience' : pokemon_info['base_experience'],
                    'weight' : pokemon_info['weight'],
                    'species' : pokemon_info['species']['name']
                }
                resolved = True
                
            elif res.status_code == 429:
                print(res.status_code)
                time.sleep(60)
            else:
                print(res.status_code)
                sleep_val = random.randint(1,10)
                time.sleep(sleep_val)
                
    except Exception as e:
        print(e)
    finally:
        if info != None:
            listManager.append(info)
            time.sleep(0.5)
            return

def image_formatter(im):
    return f'<img src="{im}">'

def main_pokemon_run_multiprocessing():
    ## cannot be 0, so max(NUMBER,1) solves this
    workers = max(cpu_count()-1,1)
    ## create the pool
    manager = Manager()
    
    ## Need a manager to help get the values async, the values will be updated after join
    listManager = manager.list()
    pool = Pool(workers)
    try:
        links_pokemon = get_number_pokemon()
        part_get_clean_pokemon = partial(get_pokemon_multiprocess, listManager, links_pokemon)
#         could do this the below is visualize the rate success /etc
#         pool.imap(part_get_clean_pokemon, list(range(0, len(links_pokemon))))
#         using tqdm to see progress imap works
        for _ in tqdm(pool.imap(part_get_clean_pokemon, list(range(0, len(links_pokemon)))), total=len(links_pokemon)):
            pass
        pool.close()
        pool.join()
    finally:
        pool.close()
        pool.join()
        
    pokemonList = list(listManager)
    
    df_pokemon = pd.DataFrame(pokemonList)
    df_pokemon.sort_values(['id'],inplace=True)
    return df_pokemon, HTML(df_pokemon.iloc[0:4].to_html(formatters={'Image': image_formatter}, escape=False))

На приведенном ниже снимке вы увидите, что объединение в пул работало успешно и значительно улучшило время выполнения и время итерации с 9 минут до 1 минуты, а число итераций в секунду — до 14,97 итераций в секунду.

Надеюсь, это было полезно. Чтобы увидеть полный код и проверить мою записную книжку на github.

Дайте мне знать, если вы найдете какое-либо другое полезное применение для этого, и, если у вас есть, поделитесь отзывами о моем образце.

Грейсон Невинс-Арчер