Многопроцессорный цикл Python

Я надеюсь использовать multiprocessing для ускорения медленного цикла. Однако из того, что я видел в примерах многопроцессорности, я не уверен, что такая реализация является хорошей практикой, осуществимой или возможной.

Цикл обычно состоит из двух частей: data ingestion и data processing. Я хотел бы, чтобы следующая часть приема данных начиналась во время обработки, чтобы данные были доступны как можно скорее.

Псевдокод:

d = get_data(n)
for n in range(N):
    p = process_data(d)
    d = get_data(n+1) #prepare data for next process loop
  1. Подходит ли многопроцессорная обработка для такого рода функций?
  2. Как бы это сделать?

Заранее спасибо.


person Darius    schedule 14.02.2018    source источник
comment
По какой причине вы не можете объединить получение и обработку данных в один модуль и выполнять действия модуля параллельно?   -  person ShadowRanger    schedule 14.02.2018
comment
Обработка причинно-зависимая от предыдущих процессов.   -  person Darius    schedule 14.02.2018
comment
По сути, вы хотите запускать process_data(d) и get_data(n+1) параллельно?   -  person Kaiser Dandangi    schedule 14.02.2018
comment
Я считаю, что это правильно   -  person Darius    schedule 14.02.2018
comment
Ограничен ли ЦП процессов? Если нет, то вам, вероятно, лучше использовать многопоточность, так как с ней проще работать. И многопоточность, и многопроцессорность могут использовать очереди, которые позволяют легко направлять данные через несколько функций.   -  person all or None    schedule 14.02.2018
comment
В своем цикле вы можете создать два процесса (process_data и get_data), затем вызвать метод start() для обоих, а затем вызвать метод join() для обоих.   -  person John Anderson    schedule 14.02.2018


Ответы (2)


Как вы сказали, многопроцессорность — это в основном диспетчеризация и сбор работы. И, как вы пояснили, вы в основном хотите, чтобы process_data и get_data работали параллельно.

Вот мое решение для вас

import multiprocessing as mp

# create pool for dispatching work
pool = mp.Pool()

# call your functions asynchronously
process_data_process = pool.apply_async(process_data, (d,))
get_data_process = pool.apply_async(get_data, (n+1,))

# After your functions are dispatched, wait for results
process_data_result = process_data_process.get()
get_data_result = get_data_process.get()

# Note: get_data_result will not be fetched till process_data_result is ready
# But that should be fine since you can't start the next batch
# till this batch is done

И вы можете просто обернуть это в свой цикл. Надеюсь это ответит на твой вопрос!

person Kaiser Dandangi    schedule 14.02.2018
comment
Идеальный ответ - Спасибо! Еще одним интересным моментом было бы то, выигрывает ли «process_data» от нахождения в пуле, поскольку его все равно нужно ждать? - person Darius; 14.02.2018
comment
@ Дариус Я полагаю, вы правы, вы можете запустить асинхронный процесс, а затем просто синхронно запустить другой. Затем, когда вы закончите, вы можете дождаться завершения асинхронного процесса. - person Kaiser Dandangi; 15.02.2018

Предположим, вы хотите, чтобы один поток/процесс принимал данные, потому что это будет ввод-вывод, а не привязка к процессору. Вы выполняете только минимальный анализ и/или проверку данных перед передачей их на уровень обработки.

Далее предположим, что вы можете выполнять обработку данных для каждого элемента ввода полностью параллельно; что между этими входными элементами нет ни сортировки, ни времени/последовательности.

В этом случае ваша задача - это в основном дочерний плакат для модели обработки "разветвления". Вы создаете объект multiprocessing.Queue. Затем вы создаете multiprocessing.Pool. Затем этот код инициализации становится задачей обработки приема («производителем» для очереди), а весь пул процессов становится потребителями, выполняющими обработку.

В Интернете есть множество примеров этого, и в первой ссылке, вероятно, есть несколько примеров использования этого шаблона.

Оставшийся вопрос, конечно, заключается в том, как вы собираетесь обрабатывать результаты.

Если им нужно сериализовать обратно в какой-то один файл, то очевидным подходом было бы создание двух объектов Queue... один для рабочей очереди (процесс приема загружает его, процессы пула потребляют из него), а другой - выходная очередь ( пулы загружаются в него, а затем один процесс использует его для согласованной записи результатов в ваш вывод). Обратите внимание, что возможно, а иногда и весьма эффективно, мультиплексировать главный (принимающий) процесс. Он может чередовать чтение входных данных с опросами в выходной очереди для записи результатов. Но, конечно, вы также можете просто запустить другой процесс, посвященный обработке вывода.

С другой стороны, возможно, что ваши результаты могут быть записаны параллельно, возможно, рабочими процессами. Это нормально, если вы записываете результаты во множество файлов или отправляете их как операторы INSERT или UPDATE в какую-либо базу данных SQL или передаете их в Hadoop HDFS или в Spark DataSet. Существует множество форм вывода, поддающихся параллельной записи.

Также возможно, что вы захотите разделить слои обработки и вывода/обработки результатов. Возможно, ваше приложение будет оптимально настроено с большим количеством процессов на уровне обработки данных и меньшим числом процессов на уровне вывода. (Если обработка каждого элемента интенсивно использует ЦП и у вас, например, много ядер, у вас могут возникнуть проблемы с слишком большим количеством процессов, забивающих ваши каналы ввода-вывода, в то время как ЦП простаивают).

Опять же, используйте очереди. Они предназначены для поддержки согласованности между несколькими производителями и несколькими потребителями. Вы избавлены от беспокойства по поводу блокировки параллелизма, тупиковой и динамической блокировки и так далее.

person Jim Dennis    schedule 14.02.2018