Как соотносятся перегородки мешков Dask и рабочие?

Я использую ванильную установку Dask-Kubernetes с двумя рабочими процессами и одним планировщиком для перебора строк некоторого файла JSON (и применения некоторых функций, которые здесь не показаны для простоты). Я вижу, что когда-либо работал только один рабочий, а я бы ожидал увидеть их двоих.

Надеясь, что перераспределение поможет, я экспериментировал с разными значениями для bag.repartition(num), которые возвращают разное количество строк, но они ничего не меняют в дисбалансе рабочих и потреблении памяти, концентрируясь только на одном работнике.

Я думаю, что не понимаю корреляцию между разделами и рабочими, и я не смог найти ничего в документации Dask об этом. Любая помощь или указатели приветствуются!

import dask.bag as db

def grep_buildings():
    base = "https://usbuildingdata.blob.core.windows.net/usbuildings-v1-1/"
    b = db.text.read_text(f"{base}/Alabama.zip")
    # b = b.repartition(2)
    lines = b.take(3_000_000)
    return lines

len(grep_buildings())

person deeplook    schedule 24.09.2020    source источник


Ответы (1)


В вашем примере вы открываете файл, и он сжат

db.text.read_text(f"{base}/Alabama.zip")

Dask может открывать и обрабатывать несколько файлов параллельно, по крайней мере, с одним разделом на файл. Dask также может разбивать отдельный файл на части (параметр blocksize); но это работает только для несжатых данных. Причина в том, что для методов сжатия всего файла единственный способ добраться до некоторой точки в несжатом потоке - это читать с самого начала, чтобы каждый раздел читал большую часть данных.

Наконец, повторное разбиение не помогает вам, когда вы начинаете с одного раздела: вам нужно прочитать весь этот файл, прежде чем разбивать данные на части для последующих задач.

person mdurant    schedule 24.09.2020