Ожидается, что dask.compute(...) будет блокирующим вызовом. Однако, когда я вложил dask.compute, а внутренний выполняет ввод-вывод (например, dask.dataframe.read_parquet), внутренний dask.compute не блокируется. Вот пример псевдокода:
import dask, distributed
def outer_func(name):
files = find_files_for_name(name)
df = inner_func(files).compute()
# do work with df
return result
def inner_func(files):
tasks = [ dask.dataframe.read_parquet(f) for f in files ]
tasks = dask.dataframe.concat(tasks)
return tasks
client = distributed.Client(scheduler_file=...)
results = dask.compute([ dask.delay(outer_func)(name) for name in names ])
Если бы я запустил 2 рабочих с 8 процессами в каждом, например:
dask-worker --scheduler-file $sched_file --nprocs 8 --nthreads 1
, то я ожидаю, что будет запущено не более 2 x 8 одновременных inner_func, потому что inner_func(files).compute() должен блокироваться. Однако я заметил, что внутри одного рабочего процесса, как только он запускает шаг read_parquet, может запускаться другой inner_func(files).compute(). Таким образом, в конце может быть запущено несколько inner_func(files).compute(), и иногда это может вызвать ошибку нехватки памяти.
Это ожидаемое поведение? Если да, то есть ли способ применить один inner_func(files).compute() для каждого рабочего процесса?