Сценарий многопроцессорности Python не завершается

Я пытаюсь немного освоиться с многопроцессорным модулем python2.7. Поэтому я написал небольшой скрипт, который принимает имена файлов и желаемое количество процессов в качестве входных данных, а затем запускает несколько процессов, чтобы применить функцию к каждому имени файла в моей очереди. Это выглядит так:

import multiprocessing, argparse, sys
from argparse import RawTextHelpFormatter

def parse_arguments():
    descr='%r\n\nTest different functions of multiprocessing module\n%r' % ('_'*80, '_'*80)
    parser=argparse.ArgumentParser(description=descr.replace("'", ""), formatter_class=RawTextHelpFormatter)
    parser.add_argument('-f', '--files', help='list of filenames', required=True, nargs='+')
    parser.add_argument('-p', '--processes', help='number of processes for script', default=1, type=int)
    args=parser.parse_args()
    return args 

def print_names(name):
    print name


###MAIN###

if __name__=='__main__':
    args=parse_arguments()
    q=multiprocessing.Queue()
    procs=args.processes
    proc_num=0
    for name in args.files:
        q.put(name)
    while q.qsize()!=0:
        for x in xrange(procs):
            proc_num+=1
            file_name=q.get()
            print 'Starting process %d' % proc_num
            p=multiprocessing.Process(target=print_names, args=(file_name,))
            p.start()
            p.join()
            print 'Process %d finished' % proc_num

Скрипт работает нормально и запускает новый процесс каждый раз, когда завершается старый процесс (думаю, так оно и работает?), пока все объекты в очереди не будут израсходованы. Однако скрипт не завершается после завершения очереди, а сидит без дела, и мне приходится его убивать с помощью Ctrl+C. В чем проблема?

Спасибо за ваши ответы!


person sequence_hard    schedule 22.12.2016    source источник
comment
Почему вы используете очередь? Он не используется совместно дочерними процессами, вы также можете перебирать args.files.   -  person cdarke    schedule 22.12.2016
comment
Раньше я перебирал список файлов, используя multiprocessing.Pool(), чтобы создать несколько процессов. Однако для больших списков файлов количество процессов в конце концов сокращалось, поскольку (я думаю, именно это и происходит) некоторые процессы быстрее завершают работу со своим списком задач, когда файлы меньше. Здесь я пытаюсь использовать очередь для запуска нового процесса с новым файлом каждый раз, когда процесс завершается, чтобы оптимизировать скорость моей программы.   -  person sequence_hard    schedule 22.12.2016


Ответы (1)


Похоже, вы перепутали там несколько вещей. Вы запускаете процесс, заставляете его выполнять свою работу и ждете его выхода, прежде чем запускать новый процесс в следующей итерации. Используя этот подход, вы застреваете в последовательной обработке, здесь не выполняется фактическая многопроцессорность.

Может быть, вы хотите взять это в качестве отправной точки:

import sys
import os
import time
import multiprocessing as mp

def work_work(q):
    # Draw work from the queue
    item = q.get()
    while item:
        # Print own process id and the item drawn from the queue
        print(os.getpid(), item)
        # Sleep is only for demonstration here. Usually, you 
        # do not want to use this! In this case, it gives the processes
        # the chance to "work" in parallel, otherwise one process
        # would have finished the entire queue before a second one
        # could be spawned, because this work is quickly done.
        time.sleep(0.1)
        # Draw new work
        item = q.get()

if __name__=='__main__':
    nproc = 2  # Number of processes to be used
    procs = [] # List to keep track of all processes

    work = [chr(i + 65) for i in range(5)]
    q = mp.Queue() # Create a queue...
    for w in work:
        q.put(w) # ...and fill it with some work.

    for _ in range(nproc):
        # Spawn new processes and pass each of them a reference
        # to the queue where they can pull their work from.
        procs.append(mp.Process(target=work_work, args=(q,)))
        # Start the process just created.
        procs[-1].start()

    for p in procs:
        # Wait for all processes to finish their work. They only
        # exit once the queue is empty.
        p.join()
person jbndlr    schedule 22.12.2016
comment
Хорошо, я вижу, как объединение процессов в список и завершение их, когда все сделано, действительно делает то, что я хотел сделать. Крошечный вопрос: когда процесс завершается с элементом из очереди, но очередь все еще заполнена, будет ли он просто выбирать другой, чтобы я непрерывно запускал свою программу с тем же количеством процессов, пока очередь не станет пустой? - person sequence_hard; 22.12.2016
comment
Да, именно это здесь и произойдет. - person jbndlr; 25.12.2016
comment
Эй, на самом деле я проверил ваш код, и проблема остается. Процессы выполняются и очищают очередь, но они не заканчиваются, вместо этого они засыпают, и скрипт останавливается на этом... - person sequence_hard; 19.01.2017
comment
Это связано с конкретной реализацией функции work_work: while item: прерывается, как только очередь становится пустой. Замените на while True: и добавьте time.sleep(100), если item было None. - person jbndlr; 19.01.2017