Поведение планировщика Dask при чтении/получении больших наборов данных

Это продолжение этого вопроса.

У меня возникают проблемы с сохранением большого набора данных в распределенной памяти. У меня есть планировщик, работающий на одной машине, и 8 рабочих, каждый из которых работает на своей машине, соединенной 40-гигабитным Ethernet и резервной файловой системой Lustre.

Проблема 1:

ds = DataSlicer(dataset) # ~600 GB dataset
dask_array = dask.array.from_array(ds, chunks=(13507, -1, -1), name=False) # ~22 GB chunks
dask_array = client.persist(dask_array)

При просмотре панели мониторинга состояния Dask я вижу, что все 28 задач назначаются и обрабатываются одним работником, в то время как другие работники ничего не делают. Кроме того, когда каждая задача завершила обработку и все задачи находятся в состоянии «В памяти», в кластере фактически хранится только 22 ГБ ОЗУ (т. е. первая часть набора данных). Доступ к индексам в первом блоке выполняется быстро, но любые другие индексы вызывают новый раунд чтения и загрузки данных до того, как будет возвращен результат. Это кажется противоречащим моему убеждению, что .persist() должен закреплять полный набор данных в памяти рабочих процессов после завершения выполнения. Кроме того, когда я увеличиваю размер фрагмента, одному рабочему процессу часто не хватает памяти, и он перезапускается из-за того, что ему назначено несколько огромных фрагментов данных.

Есть ли способ вручную назначать куски рабочим процессам вместо того, чтобы планировщик сваливал все задачи в один процесс? Или это ненормальное поведение планировщика? Есть ли способ обеспечить загрузку всего набора данных в оперативную память?

Проблема 2

Я нашел временный обходной путь, рассматривая каждый фрагмент набора данных как отдельный массив дасков и сохраняя каждый из них по отдельности.

dask_arrays = [da.from_delayed(lazy_slice, shape, dtype, name=False) for \
               lazy_slice, shape in zip(lazy_slices, shapes)]
for i in range(len(dask_arrays)):
    dask_arrays[i] = client.persist(dask_arrays[i])

Я проверил пропускную способность от сохраненных и опубликованных массивов дасков до нескольких параллельных считывателей, параллельно вызывая .compute() для разных фрагментов набора данных. Мне никогда не удавалось достичь совокупной пропускной способности кластера dask более 2 ГБ/с, что намного ниже возможностей нашей сети.

Является ли планировщик узким местом в этой ситуации, т. е. все ли данные направляются через планировщик моим читателям? Если это так, есть ли способ получить данные в памяти непосредственно от каждого работника? Если это не так, какие еще области в dask я могу исследовать?


person A.C.    schedule 18.06.2018    source источник