Передать несколько параметров в concurrent.futures.Executor.map?

concurrent.futures.Executor.map принимает переменное количество итерации, из которых вызывается заданная функция. Как мне его назвать, если у меня есть генератор, который создает кортежи, которые обычно распаковываются на месте?

Следующее не работает, потому что каждый из сгенерированных кортежей задается как другой аргумент для сопоставления:

args = ((a, b) for (a, b) in c)
for result in executor.map(f, *args):
    pass

Без генератора желаемые аргументы для отображения могут выглядеть так:

executor.map(
    f,
    (i[0] for i in args),
    (i[1] for i in args),
    ...,
    (i[N] for i in args),
)

person Matt Joiner    schedule 22.07.2011    source источник
comment
Я не понимаю, чего ты хочешь. В вашем последнем редактировании пример без генератора не работает, так как каждый элемент в генераторе имеет только два элемента, каково значение N?   -  person vz0    schedule 08.08.2011
comment
@vz0: N — количество элементов в кортежах, сгенерированных args.   -  person Matt Joiner    schedule 09.08.2011


Ответы (8)


Вам нужно удалить * в вызове map:

args = ((a, b) for b in c)
for result in executor.map(f, args):
    pass

Это вызовет f, len(args) раз, где f должен принимать один параметр.

Если вы хотите, чтобы f принимал два параметра, вы можете использовать лямбда-вызов, например:

args = ((a, b) for b in c)
for result in executor.map(lambda p: f(*p), args):   # (*p) does the unpacking part
    pass
person vz0    schedule 08.08.2011
comment
Мне нужна лямбда-часть. Можно подробнее о возможностях? - person Matt Joiner; 09.08.2011
comment
Я знаю, что это устарело, но когда я это делаю, я получаю следующую ошибку: my_method() argument after * must be a sequence, not long - person KVISH; 06.01.2016
comment
Должна ли первая строка быть args = ((a, b) for b in c) - person Tom; 31.05.2018
comment
@ Том, да, так и должно быть! Спасибо - person vz0; 01.06.2018
comment
@ vz0, почему подчеркнуто «В Linux»? зависит ли поведение от ОС? - person pcko1; 18.12.2020
comment
@ pcko1, это отличный вопрос, история редактирования показывает, что это редактирование сделал кто-то другой. Почему? Я понятия не имею! - person vz0; 29.12.2020
comment
тем не менее этот ответ бесценен - person pcko1; 29.12.2020

Один аргумент, который повторяется, один аргумент в c

from itertools import repeat
for result in executor.map(f, repeat(a), c):
    pass

Нужно распаковать предметы c, можно распаковать c

from itertools import izip
for result in executor.map(f, *izip(*c)):
    pass

Нужно распаковать предметы c, не могу распаковать c

  1. Измените f, чтобы он принимал один аргумент и распаковывал аргумент в функции.
  2. Если каждый элемент в c имеет переменное количество элементов или вы вызываете f всего несколько раз:

    executor.map(lambda args, f=f: f(*args), c)
    

    Он определяет новую функцию, которая распаковывает каждый элемент из c и вызывает f. Использование аргумента по умолчанию для f в lambda делает f локальным внутри lambda и, таким образом, сокращает время поиска.

  3. Если у вас есть фиксированное количество аргументов и вам нужно вызывать f много раз:

    from collections import deque
    def itemtee(iterable, n=2):
        def gen(it = iter(iterable), items = deque(), next = next):
            popleft = items.popleft
            extend = items.extend
            while True:
                if not items:
                    extend(next(it))
                yield popleft()
        return [gen()] * n
    
    executor.map(f, *itemtee(c, n))
    

Где n — количество аргументов для f. Это адаптировано из itertools.tee.

person agf    schedule 08.08.2011
comment
Повторить полезно, но мой пример отличается от вопроса. Я пытался улучшить его. Прости за это. - person Matt Joiner; 08.08.2011
comment
Да, эта распаковка zip работает, но все содержимое генератора потребляется при распаковке аргументов в zip. У лямбда также есть то преимущество, что не каждый вызов функции map должен иметь одинаковое количество аргументов (не то чтобы это требование). - person Matt Joiner; 09.08.2011
comment
Это была меньшая из проблем, большая проблема заключалась в необходимости обработки всего генератора. - person Matt Joiner; 09.08.2011
comment
Нет-нет, я хочу распаковать каждый сгенерированный элемент в качестве аргументов для f. for p in args: f(*p). Извините, что так сложно объяснить :\ - person Matt Joiner; 11.08.2011
comment
Я написал аналогичную альтернативную форму itertools.tee, но обнаружил, что лямбда @vzo является гораздо более простым решением. Хотите объяснить, почему лямбда-форма имеет высокие накладные расходы? - person Matt Joiner; 11.08.2011
comment
Вызовы функций и поиск пространства имен в Python выполняются медленно. map ищет f и каждую итерацию один раз, а затем использует их повторно. С lambda ему приходится каждый раз искать f, вызывать как lambda, так и f, а также распаковывать аргументы. Если f выполняется много секунд или дольше, или вы вызываете его всего несколько раз, эти накладные расходы незначительны. Мой itemtee использует только локальные переменные и (как отредактировано) даже не использует поиск атрибутов для вызовов методов. - person agf; 11.08.2011
comment
Вам следует рассмотреть возможность использования аргумента по умолчанию в lambda для ускорения вызова f. Смотрите мою правку. - person agf; 11.08.2011
comment
@agf - Любые мысли по моему вопросу - stackoverflow.com/questions/56492876/? - person gansub; 08.06.2019

Вы можете использовать каррирование для создания новой функции с помощью метода partial в Python.

from concurrent.futures import ThreadPoolExecutor
from functools import partial


def some_func(param1, param2):
    # some code

# currying some_func with 'a' argument is repeated
func = partial(some_func, a)
with ThreadPoolExecutor() as executor:
    executor.map(func, list_of_args):
    ...

Если вам нужно передать несколько одинаковых параметров, вы можете передать их partial метод

func = partial(some_func, a, b, c)
person Vlad Bezden    schedule 26.10.2018
comment
Это решение не работает для меня. Как компилятор должен знать, что список в list_of_args относится к определенному атрибуту some_func? Скажем, у меня есть функция с 5 параметрами: Conduct_analysis(a, b, c, d, e) и я делаю: partial_analysis = partial(conduct_analysis, a=a, c=c, d=d, e=e). Затем я вызываю: res = executor.map(partial_analysis, list_of_b). Компилятор не знает, что содержимое list_of_b должно заполнить отсутствующий параметр b частичного. Когда я запускаю код с функцией с пятью аргументами, я получаю различные исключения, например, тип X не имеет атрибута Y или позиционных аргументов и т. д. - person RobertSzooba; 11.10.2020
comment
@David сначала поставь то, что ты не маскируешь, так что conduct_analysis(b, a, c, d, e) - person CpILL; 17.03.2021

Итак, предположим, что у вас есть функция, которая принимает 3 аргумента, и все 3 аргумента являются динамическими и продолжают изменяться при каждом вызове. Например:

def multiply(a,b,c):
    print(a * b * c)

Чтобы вызвать это несколько раз с использованием потоков, я бы сначала создал список кортежей, где каждый кортеж является версией a, b, c:

arguments = [(1,2,3), (4,5,6), (7,8,9), ....]

Чтобы мы знали, что функция map concurrent.futures будет принимать первый аргумент как целевую функцию, а второй аргумент как список аргументов для каждой версии функции, которая будет выполняться. Таким образом, вы можете сделать вызов следующим образом:

for _ in executor.map(multiply, arguments) # Error

Но это выдаст вам ошибку, что функция ожидала 3 arguments but got only 1. Чтобы решить эту проблему, создадим вспомогательную функцию:

def helper(numbers):
    multiply(numbers[0], numbers[1], numbers[2])

Теперь мы можем вызвать эту функцию с помощью исполнителя следующим образом:

with ThreadPoolExecutor() as executor:
     for _ in executor.map(helper, arguments):
         pass

Это должно дать вам желаемые результаты.

person Baqir Khan    schedule 05.12.2019

Вот фрагмент кода, показывающий, как отправить несколько аргументов в функцию с помощью ThreadPoolExecutor:

import concurrent.futures


def hello(first_name: str, last_name: str) -> None:
    """Prints a friendly hello with first name and last name"""
    print('Hello %s %s!' % (first_name, last_name))


def main() -> None:
    """Examples showing how to use ThreadPoolExecutor and executer.map
    sending multiple arguments to a function"""

    # Example 1: Sending multiple arguments using tuples
    # Define tuples with sequential arguments to be passed to hello()
    args_names = (
        ('Bruce', 'Wayne'),
        ('Clark', 'Kent'),
        ('Diana', 'Prince'),
        ('Barry', 'Allen'),
    )
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Using lambda, unpacks the tuple (*f) into hello(*args)
        executor.map(lambda f: hello(*f), args_names)

    print()

    # Example 2: Sending multiple arguments using dict with named keys
    # Define dicts with arguments as key names to be passed to hello()
    kwargs_names = (
        {'first_name': 'Bruce', 'last_name': 'Wayne'},
        {'first_name': 'Clark', 'last_name': 'Kent'},
        {'first_name': 'Diana', 'last_name': 'Prince'},
        {'first_name': 'Barry', 'last_name': 'Allen'},
    )
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Using lambda, unpacks the dict (**f) into hello(**kwargs)
        executor.map(lambda f: hello(**f), kwargs_names)


if __name__ == '__main__':
    main()
person Leandro Toledo    schedule 30.04.2021

Для ProcessPoolExecutor.map():

Аналогично map(func, *iterables), за исключением:

итерации собираются немедленно, а не лениво;

func выполняется асинхронно, и несколько вызовов func могут быть выполнены одновременно.

Попробуйте запустить следующий фрагмент под python 3, и все станет ясно:

from concurrent.futures import ProcessPoolExecutor

def f(a, b):
    print(a+b)

with ProcessPoolExecutor() as pool:
    pool.map(f, (0, 1, 2, 3, 4, 5, 6, 7, 8, 9), (0, 1, 2))

# 0, 2, 4

array = [(i, i) for i in range(3)]
with ProcessPoolExecutor() as pool:
    pool.map(f, *zip(*array))

# 0, 2, 4
person Tengerye    schedule 30.05.2019

Я видел так много ответов здесь, но ни один из них не так прост, как использование лямбда-выражений:

foo(x,y): пройти

хотите вызвать вышеуказанный метод 10 раз с одинаковым значением, т.е. xVal и yVal? с concurrent.futures.ThreadPoolExecutor() в качестве исполнителя:

for _ in executor.map( lambda _: foo(xVal, yVal), range(0, 10)):
    pass
person H L    schedule 07.11.2019
comment
Спасибо, было очень просто и смог запустить мой код в потоке - person Safeer Abbas; 08.01.2021

скажем, у вас есть данные, подобные этому, во фрейме данных, показанном ниже, и вы хотите передать первые два столбца функции, которая будет считывать изображения и предсказывать особенности, а затем вычислять разницу и возвращать значение разницы.

Примечание: у вас может быть любой сценарий в соответствии с вашими требованиями, и, соответственно, вы можете определить функцию.

Приведенный ниже фрагмент кода будет принимать эти два столбца в качестве аргумента и передавать механизму Threadpool (также показывая индикатор выполнения)

введите здесь описание изображения

''' function that will give the difference of two numpy feature matrix'''
def getDifference(image_1_loc, image_2_loc, esp=1e-7):
       arr1 = ''' read 1st image and extract feature '''
       arr2 = ''' read 2nd image and extract feature '''
       diff = arr1.ravel() - arr2.ravel() + esp    
       return diff

'''Using ThreadPoolExecutor from concurrent.futures with multiple argument'''

with ThreadPoolExecutor() as executor:
        result = np.array(
                         list(tqdm(
                                   executor.map(lambda x : function(*x), [(i,j) for i,j in df[['image_1','image_2']].values]),
                               total=len(df)
                                  ) 
                             )
                          )

введите здесь описание изображения

person Vaibhav K    schedule 29.06.2021