вложенный dask.compute не блокирует

Ожидается, что dask.compute(...) будет блокирующим вызовом. Однако, когда я вложил dask.compute, а внутренний выполняет ввод-вывод (например, dask.dataframe.read_parquet), внутренний dask.compute не блокируется. Вот пример псевдокода:

import dask, distributed

def outer_func(name):
    files = find_files_for_name(name)
    df = inner_func(files).compute()
    # do work with df
    return result

def inner_func(files):
    tasks = [ dask.dataframe.read_parquet(f) for f in files ]
    tasks = dask.dataframe.concat(tasks)
    return tasks

client = distributed.Client(scheduler_file=...)
results = dask.compute([ dask.delay(outer_func)(name) for name in names ])

Если бы я запустил 2 рабочих с 8 процессами в каждом, например:

dask-worker --scheduler-file $sched_file --nprocs 8 --nthreads 1

, то я ожидаю, что будет запущено не более 2 x 8 одновременных inner_func, потому что inner_func(files).compute() должен блокироваться. Однако я заметил, что внутри одного рабочего процесса, как только он запускает шаг read_parquet, может запускаться другой inner_func(files).compute(). Таким образом, в конце может быть запущено несколько inner_func(files).compute(), и иногда это может вызвать ошибку нехватки памяти.

Это ожидаемое поведение? Если да, то есть ли способ применить один inner_func(files).compute() для каждого рабочего процесса?


person user1527390    schedule 09.08.2017    source источник
comment
Кажется, здесь есть небольшая путаница. dask.dataframe создает ленивые объекты, и это ненормально создавать/вычислять их в функции, которая также задерживается/вычисляется. Учтите, что эта функция отправляется рабочему процессу: где вы ожидаете, что произойдут вычисления?   -  person mdurant    schedule 11.08.2017
comment
Вложенность в этом примере довольно типична для реального потока данных ИМХО. На самом деле не всегда осуществимо/желательно работать с распределенной структурой данных, такой как dask DataFrame, чтобы мы могли избежать такого вложения. Потому что dask DataFrame API меньше, чем pandas, и потому что очень важно поддерживать рабочую версию последовательного кода. Из того, что я вижу, inner_func, кажется, работает в нескольких потоках внутри процесса dask-worker, но я указываю только один поток для каждого работника, используя, например: dask-worker --scheduler-file sched.json --nprocs 3 --nthreads 1 --локальный-каталог /tmp/   -  person user1527390    schedule 17.08.2017


Ответы (2)


Когда вы просите распределенный планировщик dask запустить работу, он отправляет код функций и любые необходимые данные рабочим функциям, которые находятся в разных процессах, возможно, на разных машинах. Эти рабочие процессы добросовестно выполняют функции, работая как обычный код Python. Дело в том, что работающая функция не знает, что она находится на dask worker — по умолчанию она увидит, что не настроен глобальный распределенный клиент dask, и сделает то, что dask обычно делает в этом случае: выполняет любой dask рабочие нагрузки в планировщике по умолчанию (поточном).

Если вам действительно необходимо выполнять полные вычислительные операции в задачах и вы хотите, чтобы они использовали распределенный планировщик, выполняющий эти задачи, вам потребуется использовать рабочий клиент. Однако я чувствую, что в вашем случае перефразирование задания для удаления вложенности (что-то вроде приведенного выше псевдокода, хотя это может работать и с вычислениями), вероятно, является более простым подходом.

person mdurant    schedule 17.08.2017

Это не относится к многопроцессорному планировщику.

Чтобы использовать распределенный планировщик, я нашел обходной путь, используя пошаговую отправку заданий через API Distributed.Client, а не полагаясь на dask.compute. dask.compute подходит для простых случаев использования, но явно не имеет четкого представления о том, сколько невыполненных задач может быть запланировано, поэтому в этом случае перегружает систему.

Вот псевдокод для запуска набора задач dask.Delayed с стимуляцией:

import distributed as distr

def paced_compute(tasks, batch_size, client):
    """
    Run delayed tasks, maintaining at most batch_size running at any
    time. After the first batch is submitted,
    submit a new job only after an existing one is finished, 
    continue until all tasks are computed and finished.

    tasks: collection of dask.Delayed
    client: distributed.Client obj
    """
    results, tasks = [], list(tasks)
    working_futs = client.compute(tasks[:batch_size])
    tasks = tasks[batch_size:]
    ac = distr.as_completed(working_futs)
    for fut in ac:
        res = fut.result()
        results.append(res)
        if tasks:
            job = tasks.pop()
            ac.add(client.compute(job))
    return results
person user1527390    schedule 10.08.2017