Предположим, вы хотите, чтобы один поток/процесс принимал данные, потому что это будет ввод-вывод, а не привязка к процессору. Вы выполняете только минимальный анализ и/или проверку данных перед передачей их на уровень обработки.
Далее предположим, что вы можете выполнять обработку данных для каждого элемента ввода полностью параллельно; что между этими входными элементами нет ни сортировки, ни времени/последовательности.
В этом случае ваша задача - это в основном дочерний плакат для модели обработки "разветвления". Вы создаете объект multiprocessing.Queue. Затем вы создаете multiprocessing.Pool. Затем этот код инициализации становится задачей обработки приема («производителем» для очереди), а весь пул процессов становится потребителями, выполняющими обработку.
В Интернете есть множество примеров этого, и в первой ссылке, вероятно, есть несколько примеров использования этого шаблона.
Оставшийся вопрос, конечно, заключается в том, как вы собираетесь обрабатывать результаты.
Если им нужно сериализовать обратно в какой-то один файл, то очевидным подходом было бы создание двух объектов Queue... один для рабочей очереди (процесс приема загружает его, процессы пула потребляют из него), а другой - выходная очередь ( пулы загружаются в него, а затем один процесс использует его для согласованной записи результатов в ваш вывод). Обратите внимание, что возможно, а иногда и весьма эффективно, мультиплексировать главный (принимающий) процесс. Он может чередовать чтение входных данных с опросами в выходной очереди для записи результатов. Но, конечно, вы также можете просто запустить другой процесс, посвященный обработке вывода.
С другой стороны, возможно, что ваши результаты могут быть записаны параллельно, возможно, рабочими процессами. Это нормально, если вы записываете результаты во множество файлов или отправляете их как операторы INSERT или UPDATE в какую-либо базу данных SQL или передаете их в Hadoop HDFS или в Spark DataSet. Существует множество форм вывода, поддающихся параллельной записи.
Также возможно, что вы захотите разделить слои обработки и вывода/обработки результатов. Возможно, ваше приложение будет оптимально настроено с большим количеством процессов на уровне обработки данных и меньшим числом процессов на уровне вывода. (Если обработка каждого элемента интенсивно использует ЦП и у вас, например, много ядер, у вас могут возникнуть проблемы с слишком большим количеством процессов, забивающих ваши каналы ввода-вывода, в то время как ЦП простаивают).
Опять же, используйте очереди. Они предназначены для поддержки согласованности между несколькими производителями и несколькими потребителями. Вы избавлены от беспокойства по поводу блокировки параллелизма, тупиковой и динамической блокировки и так далее.
person
Jim Dennis
schedule
14.02.2018
start()
для обоих, а затем вызвать методjoin()
для обоих. - person John Anderson   schedule 14.02.2018