Параллелизм можно рассматривать как несколько вещей, происходящих одновременно.

Представьте, что вы устраиваете званый обед, и вам нужно приготовить несколько блюд одновременно, чтобы угостить всех гостей. Один из способов сделать это — готовить каждое блюдо одно за другим, но это займет много времени, и вашим гостям придется долго ждать, чтобы поесть.

Лучший способ справиться с этим — использовать несколько конфорок на плите и одновременно готовить несколько блюд. Каждая конфорка представляет собой отдельную задачу, и вы можете использовать их для одновременного приготовления разных блюд. Таким образом, вы сможете закончить приготовление всех блюд за более короткое время, а ваши гости смогут насладиться трапезой вместе.

Горелки представляют параллельные задачи, а блюда представляют различные процессы или потоки, которые выполняются одновременно. Это позволяет выполнять несколько задач одновременно, повышая эффективность и сокращая общее время, необходимое для выполнения всех задач.

В программировании параллелизм — это концепция одновременного выполнения нескольких потоков или процессов, независимо друг от друга. Это позволяет эффективно использовать ресурсы, такие как ЦП и память, и может привести к более быстрому отклику и динамичности приложения.

Существует два основных типа параллелизма:

  1. Параллелизм: относится к одновременному выполнению нескольких задач с использованием нескольких процессоров или ядер. Параллелизм позволяет системе выполнять большую задачу за меньшее время, разделяя задачу на более мелкие независимые подзадачи, которые можно выполнять одновременно.
  2. Асинхронность: относится к способности системы обрабатывать несколько задач неблокирующим образом. Это означает, что система может продолжать работать над другими задачами, ожидая завершения конкретной задачи.

Параллелизм может быть достигнут многими способами, такими как использование потоков, процессов и программирование, управляемое событиями. Тем не менее, это также может привести к сложности и потенциальным условиям гонки и другим проблемам синхронизации, если не будет обработано должным образом.

Концепция параллелизма присутствовала в Python с первых дней существования языка. Однако конкретные механизмы для достижения параллелизма со временем эволюционировали.

В ранних версиях Python был представлен модуль thread, который предоставлял базовый API для создания потоков и управления ими. Однако глобальная блокировка интерпретатора (GIL) в реализации Python на CPython, которая является наиболее широко используемой реализацией, ограничивает преимущества многопоточности в производительности, не позволяя нескольким потокам одновременно выполнять байт-код Python.

В Python 2.4 был представлен модуль concurrent.futures, который предоставлял высокоуровневый API для работы с потоками и процессами. Этот модуль упростил написание параллельного кода, но по-прежнему страдал от ограничений GIL.

В Python 3.2 был улучшен модуль concurrent.futures и добавлен класс concurrent.futures.ProcessPoolExecutor, что позволило реализовать настоящий параллелизм за счет использования нескольких процессов вместо потоков.

Python 3.4 представил библиотеку asyncio, которая позволяет писать параллельный код с использованием синтаксиса async/await. Эта библиотека предоставляет способ написания асинхронного кода, который может выполняться одновременно без необходимости в потоках или процессах.

В последних версиях Python (3.8+) класс concurrent.futures.ThreadPoolExecutor был обновлен для поддержки использования собственных примитивов потоковой передачи и введения класса threading.Barrier, который можно использовать для синхронизации выполнения потоков.

В целом, история параллелизма в Python показывает постепенную эволюцию доступных механизмов для достижения параллелизма с упором на предоставление абстракций более высокого уровня и упрощение написания параллельного кода.

Резьба

Потоки в Python — это способ достижения параллелизма путем создания и управления несколькими потоками в рамках одного процесса. Каждый поток имеет свой собственный контекст выполнения, который включает в себя собственный стек вызовов и счетчик программ, но он использует то же пространство памяти, что и другие потоки в процессе. Это позволяет эффективно использовать ресурсы, такие как ЦП и память, и может привести к более быстрому отклику и динамичности приложения.

Модуль threading в Python предоставляет базовый API для создания потоков и управления ими. Основные классы этого модуля:

  • Thread: Этот класс представляет один поток выполнения. Вы можете создать новый поток, создав экземпляр этого класса и передав вызываемый объект (например, функцию или метод) в качестве цели. После запуска потока целевая функция будет выполняться в новом потоке.
  • Lock: Этот класс предоставляет способ синхронизации доступа к общим ресурсам между потоками. Замок находится в одном из двух состояний: «заблокирован» или «разблокирован». Только один поток может одновременно удерживать блокировку.
  • RLock: этот класс предоставляет способ синхронизации доступа к общим ресурсам между потоками. RLock находится в одном из двух состояний: «заблокировано» или «разблокировано». RLock может быть получен несколько раз одним и тем же потоком, и он будет разблокирован только тогда, когда один и тот же поток освобождал его столько раз, сколько он его получил.
  • Semaphore: этот класс предоставляет способ синхронизации доступа к общим ресурсам между потоками. Семафор — это неотрицательное значение, разделяемое между потоками. Значение уменьшается на единицу каждый раз, когда поток получает семафор, и увеличивается на единицу каждый раз, когда поток освобождает семафор.
  • Barrier: Этот класс предоставляет способ синхронизации выполнения потоков. Барьер — это объект синхронизации, который позволяет нескольким потокам ждать, пока друг друга не достигнут определенной точки выполнения, прежде чем продолжить.

В дополнение к модулю threading Python также предоставляет модуль concurrent.futures, который предоставляет высокоуровневый API для работы с потоками и процессами. Этот модуль включает в себя следующие классы:

  • ThreadPoolExecutor: этот класс используется для выполнения вызываемых объектов в пуле потоков. Пул потоков создается с фиксированным числом рабочих потоков, и каждый рабочий поток отвечает за выполнение одной задачи за раз.
  • ProcessPoolExecutor: этот класс используется для выполнения вызываемых объектов в пуле процессов. Пул процессов создается с фиксированным количеством рабочих процессов, и каждый рабочий процесс отвечает за выполнение одной задачи за раз.

давайте посмотрим на какой-нибудь пример

import threading
import time

def worker():
    """thread worker function"""
    print('Worker')
    time.sleep(5)
    print('Worker finished')

# create a new thread
t = threading.Thread(target=worker)

# start the thread
t.start()

# main thread continues execution
print('Main thread')

# wait for the worker thread to complete
t.join()

# main thread continues execution after worker thread completes
print('Main thread finished')

В приведенном выше примере мы определяем рабочую функцию, которая имитирует длительную задачу, засыпая на 5 секунд. Затем мы создаем новый поток и назначаем рабочую функцию в качестве цели. Затем в потоке вызывается метод start(), чтобы начать его выполнение. Основной поток продолжает выполнение и печатает «Основной поток». Метод join() вызывается в потоке, чтобы дождаться завершения рабочего потока. После завершения рабочего потока основной поток продолжает выполнение и печатает «Основной поток завершен».

Обратите внимание, что метод join() используется для ожидания завершения рабочего потока, прежде чем основной поток продолжит выполнение. Также важно отметить, что time.sleep(5) используется для моделирования длительной задачи и делает пример с потоками более понятным.

В дополнение к классу Thread модуль threading также предоставляет другие классы, такие как Lock, RLock, Semaphore и Barrier, которые можно использовать для синхронизации выполнения потоков и обеспечения безопасного и контролируемого доступа к общим ресурсам.

Например:

import threading

counter = 0

def increment():
    global counter
    lock.acquire()
    counter += 1
    lock.release()

lock = threading.Lock()

t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
t1.start()
t2.start()
t1.join()
t2.join()

print(counter)

В этом примере объект Lock используется для синхронизации доступа к общей переменной counter. Метод acquire() используется для получения блокировки, а метод release() используется для ее снятия. Это гарантирует, что только один поток может одновременно обращаться к общей переменной, предотвращая условия гонки и другие проблемы синхронизации.

Важно отметить, что многопоточность может быть сложной, поэтому важно быть знакомым с модулем threading и Global Interpreter Lock (GIL) в Python, которые могут повлиять на производительность многопоточных программ.

Более продвинутый пример

import threading
import queue
import time

class DataFetcherThread(threading.Thread):
    """Thread class for fetching data from a remote API"""
    def __init__(self, queue, api_url):
        threading.Thread.__init__(self)
        self.queue = queue
        self.api_url = api_url

    def run(self):
        while True:
            # fetch data from the API
            data = fetch_data_from_api(self.api_url)

            # put the data in the queue
            self.queue.put(data)

            # sleep for a short period of time
            time.sleep(5)

class DataProcessorThread(threading.Thread):
    """Thread class for processing data"""
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            # get data from the queue
            data = self.queue.get()

            # process the data
            processed_data = process_data(data)

            # do something with the processed data
            save_to_database(processed_data)

            # signal that the data has been processed
            self.queue.task_done()

def fetch_data_from_api(api_url):
    """Function to fetch data from a remote API"""
    # code to fetch data from the API
    pass

def process_data(data):
    """Function to process data"""
    # code to process data
    pass

def save_to_database(processed_data):
    """Function to save data to a database"""
    # code to save data to the database
    pass

# create a queue to hold the data
data_queue = queue.Queue()

# create the fetcher and processor threads
fetcher = DataFetcherThread(data_queue, 'https://example.com/api')
processor = DataProcessorThread(data_queue)

# start the threads
fetcher.start()
processor.start()

# wait for the threads to complete
data_queue.join()

В приведенном выше примере у нас есть два класса потоков: DataFetcherThread и DataProcessorThread. Класс DataFetcherThread отвечает за выборку данных из удаленного API и помещение их в очередь, а класс DataProcessorThread отвечает за получение данных из очереди, их обработку и сохранение в базе данных.

Класс DataFetcherThread постоянно извлекает данные из API и помещает их в очередь, а класс DataProcessorThread постоянно получает данные из очереди, обрабатывает их и сохраняет в базе данных.

Мы используем объект queue.Queue для хранения данных, что обеспечивает доступ к данным потокобезопасным способом, предотвращая условия гонки и другие проблемы синхронизации.

fetch_data_from_api, process_data и save_to_database — это функции, имитирующие реальный сценарий, когда данные извлекаются из API, обрабатываются и сохраняются в базе данных.

Давайте используем RLock, Semaphore и Barrier для достижения параллелизма в Python:

import threading
import time

counter = 0

def increment():
    global counter
    lock.acquire()
    counter += 1
    lock.release()

# Create a reentrant lock
lock = threading.RLock()

# Create a semaphore with a maximum capacity of 3
semaphore = threading.Semaphore(3)

# Create a barrier with a capacity of 4
barrier = threading.Barrier(4)

def worker1():
    """Thread worker function 1"""
    global counter
    semaphore.acquire()
    increment()
    print('Worker 1:', counter)
    time.sleep(1)
    semaphore.release()
    barrier.wait()

def worker2():
    """Thread worker function 2"""
    global counter
    semaphore.acquire()
    increment()
    print('Worker 2:', counter)
    time.sleep(1)
    semaphore.release()
    barrier.wait()

def worker3():
    """Thread worker function 3"""
    global counter
    semaphore.acquire()
    increment()
    print('Worker 3:', counter)
    time.sleep(1)
    semaphore.release()
    barrier.wait()

def worker4():
    """Thread worker function 4"""
    global counter
    semaphore.acquire()
    increment()
    print('Worker 4:', counter)
    time.sleep(1)
    semaphore.release()
    barrier.wait()

# create 4 threads
t1 = threading.Thread(target=worker1)
t2 = threading.Thread(target=worker2)
t3 = threading.Thread(target=worker3)
t4 = threading.Thread(target=worker4)

# start the threads
t1.start()
t2.start()
t3.start()
t4.start()

# wait for the threads to complete
t1.join()
t2.join()
t3.join()
t4.join()

print('Counter:', counter)

В приведенном выше примере у нас есть четыре рабочие функции потока worker1, worker2, worker3 и worker4, которые увеличивают глобальную переменную counter и выводят ее значение. Доступ к глобальной переменной защищен повторной блокировкой lock, которая является экземпляром класса RLock.

В этом примере показано, как RLock, Semaphore и Barrier можно использовать для достижения параллелизма и синхронизации выполнения потоков в более сложном сценарии. Класс RLock используется для защиты глобальной переменной counter от условий гонки и других проблем синхронизации, а класс Semaphore используется для управления максимальным количеством потоков, которые могут одновременно обращаться к глобальной переменной. Класс Barrier используется для синхронизации выполнения потоков и обеспечения того, чтобы все они достигли определенной точки выполнения перед продолжением.

Важно отметить, что в этом примере использование RLock гарантирует, что поток может получить блокировку несколько раз, и она будет разблокирована только тогда, когда один и тот же поток освободит ее столько же раз, сколько получил. Это полезно в тех случаях, когда потоку необходимо получить блокировку несколько раз, например, если ему необходимо несколько раз получить доступ к защищенному ресурсу в одной и той же функции.

Использование Semaphore позволяет вам контролировать количество потоков, которые могут одновременно обращаться к защищенному ресурсу, следя за тем, чтобы защищенный ресурс не был перегружен слишком большим количеством потоков.

Использование Barrier позволяет синхронизировать выполнение нескольких потоков, убедившись, что потоки работают вместе и не мешают друг другу.

В целом, используя комбинацию RLock, Semaphore и Barrier, вы можете добиться более сложного и детального управления параллельным выполнением потоков и доступом к общим ресурсам в программе Python.

параллельные.фьючерсы

concurrent.futures — это модуль в стандартной библиотеке Python, предоставляющий высокоуровневый API для работы с потоками и процессами. Он был представлен в Python 2.4 и улучшен в более поздних версиях. Модуль предоставляет два основных класса для достижения параллелизма: ThreadPoolExecutor и ProcessPoolExecutor.

ThreadPoolExecutor используется для выполнения вызываемых объектов в пуле потоков, а ProcessPoolExecutor используется для выполнения вызываемых объектов в пуле процессов. Оба класса работают одинаково, но ProcessPoolExecutor допускает настоящий параллелизм за счет использования нескольких процессов вместо потоков, что может быть более эффективным при выполнении задач, привязанных к ЦП.

Вот пример использования ThreadPoolExecutor для достижения параллелизма в Python:

import concurrent.futures

def long_running_task(n):
    return n*n

with concurrent.futures.ThreadPoolExecutor() as executor:
    results = [executor.submit(long_running_task, i) for i in range(10)]

    for f in concurrent.futures.as_completed(results):
        print(f.result())

В приведенном выше примере мы определяем функцию long_running_task, которая имитирует длительную задачу, выполняя простое вычисление (возведение числа в квадрат). Затем мы используем ThreadPoolExecutor, чтобы представить эту функцию как задачу, которая будет выполняться одновременно с диапазоном входных данных. Метод submit возвращает объект Future, который можно использовать для отслеживания хода выполнения задачи и получения результата. Функция as_completed используется для перебора объектов Future по мере их завершения и печати результатов.

Стоит отметить, что ThreadPoolExecutor создает пул рабочих потоков и назначает задачу доступному потоку из пула. Размер пула можно определить, передав максимальное количество потоков в качестве аргумента конструктору. Если количество задач превышает размер пула, задачи ставятся в очередь до тех пор, пока поток не станет доступным.

Точно так же вот пример использования ProcessPoolExecutor для достижения параллелизма в Python:

import concurrent.futures

def long_running_task(n):
    return n*n

with concurrent.futures.ProcessPoolExecutor() as executor:
    results = [executor.submit(long_running_task, i) for i in range(10)]

    for f in concurrent.futures.as_completed(results):
        print(f.result())

Здесь мы используем ProcessPoolExecutor вместо ThreadPoolExecutor, чтобы отправить функцию long_running_task для одновременного выполнения. Остальной код остается прежним. Используя ProcessPoolExecutor, мы можем использовать истинный параллелизм нескольких процессов для выполнения задачи, связанной с процессором, что может быть более эффективным, чем использование потоков.

Стоит отметить, что при использовании ProcessPoolExecutor задачи выполняются в отдельных процессах Python, что позволяет им выполняться по-настоящему параллельно и в полной мере использовать преимущества нескольких ядер ЦП. Однако это также означает, что задачи не могут совместно использовать память и должны использовать механизмы межпроцессного взаимодействия (IPC) для обмена данными, что может быть более сложным и менее эффективным, чем совместное использование памяти между потоками.

Еще одна вещь, которую следует отметить, заключается в том, что с ProcessPoolExecutor функция задачи и передаваемые ей данные должны быть доступными для выбора, что означает, что функция и данные должны иметь возможность сериализоваться и десериализоваться.

В целом модуль concurrent.futures предоставляет простой и мощный способ достижения параллелизма в Python с помощью пулов потоков и пулов процессов. Он абстрагируется от многих низкоуровневых деталей работы с потоками и процессами и предоставляет более удобный API для одновременного выполнения вызываемых объектов.

Давайте посмотрим на более продвинутые примеры

В приведенном ниже примере у нас есть функция long_running_task, которая имитирует длительную задачу, засыпая на 1 секунду. У нас также есть функция обратного вызова callback_function, которая вызывается, когда задача завершена.

Мы используем ProcessPoolExecutor для отправки функции long_running_task для одновременного выполнения в отдельных процессах и используем метод add_done_callback для регистрации функции callback_function, которая будет вызываться при завершении задачи.

Затем мы используем ThreadPoolExecutor аналогичным образом, но на этот раз задачи выполняются одновременно в пуле потоков, а не в отдельных процессах.

В обоих случаях мы можем видеть идентификаторы процессов (PID) основного процесса, задач и функции обратного вызова результата. Это позволяет нам видеть, какой процесс выполняет какую задачу и какой процесс генерирует результат.

Стоит отметить, что мы также можем передать количество процессов или потоков в конструкторы-исполнители, например, ProcessPoolExecutor(max_workers=4) или ThreadPoolExecutor(max_workers=4), и это соответственно ограничит размер пула.

Еще одним преимуществом использования функции обратного вызова является то, что она позволяет нам выполнять дополнительную обработку результатов, например сохранять результаты в базе данных или отправлять их в другую службу, не блокируя основной процесс.

import concurrent.futures
import os
import time

def long_running_task(n):
    print(f"Task {n} is running on process {os.getpid()}")
    time.sleep(1)
    return n * n

def callback_function(future):
    result = future.result()
    print(f"Result {result} was generated by process {os.getpid()}")

with concurrent.futures.ProcessPoolExecutor() as executor:
    results = [executor.submit(long_running_task, i) for i in range(10)]

    # Add a callback to be called when a future is done
    for result in results:
        result.add_done_callback(callback_function)

print(f"Main process {os.getpid()}")

with concurrent.futures.ThreadPoolExecutor() as executor:
    results = [executor.submit(long_running_task, i) for i in range(10)]

    # Add a callback to be called when a future is done
    for result in results:
        result.add_done_callback(callback_function)

print(f"Main process {os.getpid()}")

асинцио

asyncio — это модуль в стандартной библиотеке Python, который обеспечивает основу для написания параллельного кода с использованием парадигмы асинхронного программирования. Он был представлен в Python 3.4 и улучшен в более поздних версиях.

asyncio использует концепцию сопрограмм и цикла событий для достижения параллелизма. Сопрограммы — это специальные типы функций, которые можно приостанавливать и возобновлять, что позволяет одновременно выполнять несколько задач без необходимости в потоках или процессах. Цикл событий — это центральный механизм, который планирует и запускает сопрограммы.

Вот пример использования asyncio для достижения параллелизма в Python:

import asyncio

async def long_running_task(n):
    print(f"Task {n} started")
    await asyncio.sleep(1)
    print(f"Task {n} finished")
    return n * n

async def main():
    tasks = [long_running_task(i) for i in range(10)]
    completed, pending = await asyncio.wait(tasks)
    for task in completed:
        print(task.result())

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

В приведенном выше примере у нас есть асинхронная функция long_running_task, которая имитирует длительную задачу с помощью await asyncio.sleep(1). Эта функция украшена ключевым словом async, которое позволяет приостанавливать и возобновлять ее работу.

Функция main также является асинхронной функцией, она создает список сопрограмм long_running_task и использует функцию asyncio.wait для планирования их выполнения. Функция wait возвращает два набора сопрограмм, один для завершенных задач и один для ожидающих выполнения задач. Затем мы перебираем выполненные задачи и печатаем результаты, используя метод result() объекта Task.

Цикл событий создается с помощью функции asyncio.get_event_loop(), а функция main запускается с помощью метода loop.run_until_complete(). Этот метод запускает цикл обработки событий до тех пор, пока не будут выполнены все запланированные задачи.

Стоит отметить, что asyncio позволяет нам писать параллельный код, аналогичный синхронному коду, используя ключевые слова async и await. Это делает его гораздо более интуитивным и легким для чтения и понимания, чем код, использующий потоки или процессы.

Вот еще один пример, на этот раз с использованием asyncio.gather:

import asyncio

async def long_running_task(n):
    print(f"Task {n} started")
    await asyncio.sleep(1)
    print(f"Task {n} finished")
    return n * n

async def main():
    results = await asyncio.gather(*(long_running_task(i) for i in range(10)))
    for result in results:
        print(result)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

В приведенном выше примере мы используем функцию asyncio.gather для планирования и одновременного запуска задач, и она возвращает результат в виде списка завершенных задач.

В обоих примерах asyncio предоставляет простой и мощный способ достижения параллелизма в Python с помощью сопрограмм и цикла обработки событий, а также позволяет создавать более элегантный и читаемый код, чем использование потоков или процессов.

Дополнительные примеры

import asyncio

async def long_running_task(n):
    print(f"Task {n} started")
    await asyncio.sleep(1)
    print(f"Task {n} finished")
    return n * n

async def main():
    semaphore = asyncio.Semaphore(3)
    tasks = [long_running_task_with_semaphore(semaphore, i) for i in range(10)]
    completed, pending = await asyncio.wait(tasks)
    for task in completed:
        print(task.result())

async def long_running_task_with_semaphore(semaphore, n):
    async with semaphore:
        return await long_running_task(n)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Здесь у нас есть семафор semaphore, который контролирует максимальное количество задач, которые могут выполняться одновременно. Мы используем класс asyncio.Semaphore для создания семафора и передаем ему максимальную емкость 3.

Затем мы создаем новую асинхронную функцию long_running_task_with_semaphore, которая принимает семафор и ввод задачи в качестве параметров. Эта функция использует оператор async with для получения семафора перед запуском функции long_running_task и автоматически освобождает семафор после завершения задачи.

В функции main мы создаем список сопрограмм long_running_task_with_semaphore и используем функцию asyncio.wait для планирования их выполнения. Функция wait возвращает два набора сопрограмм, один для завершенных задач и один для ожидающих выполнения задач. Затем мы перебираем выполненные задачи и печатаем результаты, используя метод result() объекта Task.

Цикл событий создается с помощью функции asyncio.get_event_loop(), а функция main запускается с помощью метода loop.run_until_complete(). Этот метод запускает цикл обработки событий до тех пор, пока не будут выполнены все запланированные задачи.

В приведенном выше примере мы можем видеть, как мы можем использовать класс Semaphore из asyncio для управления количеством одновременных задач, ограничивая количество токенов семафора, в этом случае одновременно может выполняться только 3 задачи. Это более сложный способ управления параллелизмом и предотвращения перегрузки системы.

В целом, все эти методы обеспечивают различные способы достижения параллелизма в Python, каждый со своими преимуществами и недостатками.