Распараллеливание функции с несколькими аргументами списков с многопроцессорной обработкой Python

Я надеюсь, что это не дубликат, но я не смог найти полностью удовлетворяющий ответ для этой конкретной проблемы.

Учитывая функцию с несколькими аргументами списка и одним итерируемым, например. здесь с двумя списками

def function(list1, list2, iterable):
    i1 = 2*iterable
    i2 = 2*iterable+1
    list1[i1] *= 2
    list2[i2] += 2
    return(list1, list2)

Доступ к каждому списку осуществляется по разным записям, поэтому операции разделены и могут быть парализованы. Каков наилучший способ сделать это с помощью многопроцессорной обработки Python?

Одним из простых способов распараллеливания было бы использование функции карты:

import multiprocessing as mp
from functools import partial

list1, list2 = [1,1,1,1,1], [2,2,2,2,2]
func = partial(function, list1, list2)
pool = mp.Pool()
pool.map(func, [0,1])

Проблема в том, что если кто-то делает это, он создает для каждого процесса копию списков (если я правильно понимаю функцию карты) и затем работает параллельно в разных позициях в этих копиях. В конце (после того, как были затронуты две итерации [0,1]) результат pool.map будет

[([3, 1, 1, 1, 1], [2, 4, 2, 2, 2]), ([1, 1, 3, 1, 1], [2, 2, 2, 4, 2])]

но я хочу

[([3, 1, 3, 1, 1], [2, 4, 2, 4, 2])].

Как этого добиться? Следует ли разделить список на итерацию раньше, запустить определенные операции параллельно, а затем снова объединить их?

Заранее спасибо и извините, если я что-то путаю, я только начал использовать multiprocessing-библиотеку.

EDIT: Операции над разными частями списка могут быть парализованы без синхронизации, операции над всем списком не могут быть парализованы (без синхронизации). Поэтому решение моей конкретной проблемы состоит в том, чтобы разделить списки и функции на операции и на части списков. После этого объединяются части списков, чтобы получить весь список обратно.


person mmarah    schedule 07.05.2018    source источник


Ответы (2)


Вы не можете совместно использовать память между процессами (технически вы можете в системах на основе ветвления при условии, что вы не меняете объекты/влияет на количество ссылок, что редко когда-либо происходит в реальном использовании) - вы можете либо использовать общая структура (большинство из них доступны в разделе multiprocessing.Manager()), который будет выполнять синхронизацию/обновления за вас, или передавать только данные, необходимые для обработки, а затем сшивать результат.

Ваш пример достаточно прост, чтобы оба подхода работали без серьезных штрафов, поэтому я бы просто пошел с менеджером:

import multiprocessing
import functools

def your_function(list1, list2, iterable):
    i1 = 2 * iterable
    i2 = 2 * iterable + 1
    list1[i1] *= 2
    list2[i2] += 2

if __name__ == "__main__":  # a multi-processing guard for cross-platform use
    manager = multiprocessing.Manager()
    l1 = manager.list([1, 1, 1, 1, 1])
    l2 = manager.list([2, 2, 2, 2, 2])
    func = functools.partial(your_function, l1, l2)
    pool = multiprocessing.Pool()
    pool.map(func, [0, 1])
    print(l1, l2)  # [2, 1, 2, 1, 1] [2, 4, 2, 4, 2]

Или, если ваш вариант использования более благоприятен для сшивания данных после обработки:

import multiprocessing
import functools

def your_function(list1, list2, iterable):
    i1 = 2 * iterable
    i2 = 2 * iterable + 1
    return (i1, list1[i1] * 2), (i2, list2[i2] + 2)  # return the changed index and value

if __name__ == "__main__":  # a multi-processing guard for cross-platform use
    l1 = [1, 1, 1, 1, 1]
    l2 = [2, 2, 2, 2, 2]
    func = functools.partial(your_function, l1, l2)
    pool = multiprocessing.Pool()
    results = pool.map(func, [0, 1])
    for r1, r2 in results:  # stitch the results back into l1 and l2
        l1[r1[0]] = r1[1]
        l2[r2[0]] = r2[1]
    print(l1, l2)  # [2, 1, 2, 1, 1] [2, 4, 2, 4, 2]

При этом вывод - это не то, что вы указали/ожидали, а то, что должно произойти на основе вашей функции.

Кроме того, если ваш случай настолько прост, вы можете вообще отказаться от многопроцессорной обработки - дополнительная многопроцессорная нагрузка (плюс синхронизация диспетчера) того не стоит, если только your_function() не выполняет какую-то задачу, требующую большой нагрузки на ЦП.

person zwer    schedule 16.05.2018
comment
Большое спасибо за ответ. Я только что добавил решение, похожее на ваше второе решение. На самом деле этот простой случай является просто педагогическим примером. Мой your_function() на практике выполняет некоторые задачи с интенсивным использованием ЦП (несколько операций с тензорами, такими как numpy.tensordot(), scipy.linalg.eigsh(), numpy.linalgs.svd()). - person mmarah; 16.05.2018

Вот решение проблемы. Я не знаю, лучший ли это способ, но он работает:

import multiprocessing as mp
from functools import partial

def operation1(lst, pos)
    return(pos, lst[pos] * 2)

def operation2(lst, pos)
    return(pos, lst[pos] + 2)

if __name__ == "__main__":
    list1, list2 = [1,1,1,1,1], [2,2,2,2,2]
    iterable = [0,1]
    index1_list = [2*i for i in iterable]
    index2_list = [2*i+1 for i in iterable]

    func1 = partial(operation1, list1)
    func2 = partial(operation2, list2)

    with mp.Pool() as pool:
        result1 = pool.map(func1, index1_list)
        result2 = pool.map(func2, index2_list)

    for result in result1:
        list1[result[0]] = result[1]

    for result in result2:
        list2[result[0]] = result[1]

    print(list1, list2)
person mmarah    schedule 16.05.2018