Как в Django вызвать подпроцесс с медленным временем запуска

Предположим, вы используете Django в Linux, и у вас есть представление, и вы хотите, чтобы это представление возвращало данные из подпроцесса с именем cmd, который работает с файлом что создает представление, например вот так:

 def call_subprocess(request):
     response = HttpResponse()

     with tempfile.NamedTemporaryFile("W") as f:
         f.write(request.GET['data']) # i.e. some data

     # cmd operates on fname and returns output
     p = subprocess.Popen(["cmd", f.name], 
                   stdout=subprocess.PIPE, 
                   stderr=subprocess.PIPE)

     out, err = p.communicate()

     response.write(p.out) # would be text/plain...
     return response

Теперь предположим, что cmd имеет очень медленное время запуска, но очень быстрое время работы и изначально не имеет режима демона. Я хотел бы улучшить время отклика этого представления.

Я хотел бы, чтобы вся система работала намного быстрее, запустив несколько экземпляров cmd в рабочем пуле, дождавшись ввода и имея < strong>call_process попросите один из этих процессов рабочего пула обработать данные.

Это действительно 2 части:

Часть 1. Функция, которая вызывает cmd и cmd ожидает ввода. Это можно сделать с помощью труб, т.е.

def _run_subcmd():
    p = subprocess.Popen(["cmd", fname], 
        stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    out, err = p.communicate()
    # write 'out' to a tmp file
    o = open("out.txt", "W")
    o.write(out)
    o.close()
    p.close()
    exit()

def _run_cmd(data):
    f = tempfile.NamedTemporaryFile("W")
    pipe = os.mkfifo(f.name)

    if os.fork() == 0:
        _run_subcmd(fname)
    else:
        f.write(data)

    r = open("out.txt", "r")
    out = r.read()
    # read 'out' from a tmp file
    return out

def call_process(request):
    response = HttpResponse()

    out = _run_cmd(request.GET['data'])

    response.write(out) # would be text/plain...
    return response

Часть 2. Набор рабочих процессов, работающих в фоновом режиме и ожидающих данных. т. е. мы хотим расширить вышеизложенное, чтобы подпроцесс уже выполнялся, например. при инициализации экземпляра Django или первом вызове этого call_process создается набор этих рабочих процессов.

WORKER_COUNT = 6
WORKERS = []

class Worker(object):
    def __init__(index):
        self.tmp_file = tempfile.NamedTemporaryFile("W") # get a tmp file name
        os.mkfifo(self.tmp_file.name)
        self.p = subprocess.Popen(["cmd", self.tmp_file], 
            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        self.index = index

    def run(out_filename, data):
        WORKERS[self.index] = Null # qua-mutex??
        self.tmp_file.write(data)
        if (os.fork() == 0): # does the child have access to self.p??
            out, err = self.p.communicate()
            o = open(out_filename, "w")
            o.write(out)
            exit()

        self.p.close()
        self.o.close()
        self.tmp_file.close()
        WORKERS[self.index] = Worker(index) # replace this one
        return out_file

    @classmethod
    def get_worker() # get the next worker
    # ... static, incrementing index 

Где-то должна быть какая-то инициализация рабочих, например:

def init_workers(): # create WORKERS_COUNT workers
    for i in xrange(0, WORKERS_COUNT):
        tmp_file = tempfile.NamedTemporaryFile()
        WORKERS.push(Worker(i))

Теперь то, что у меня есть выше, становится примерно таким:

def _run_cmd(data):
     Worker.get_worker() # this needs to be atomic & lock worker at Worker.index

     fifo = open(tempfile.NamedTemporaryFile("r")) # this stores output of cmd

     Worker.run(fifo.name, data)
     # please ignore the fact that everything will be
     # appended to out.txt ... these will be tmp files, too, but named elsewhere.

     out = fifo.read()
     # read 'out' from a tmp file
     return out


def call_process(request):
     response = HttpResponse()

     out = _run_cmd(request.GET['data'])

     response.write(out) # would be text/plain...
     return response

Теперь вопросы:

  1. Будет ли это работать? (Я только что напечатал это в StackOverflow, так что я уверен, что есть проблемы, но концептуально, будет ли это работать)

  2. Какие проблемы искать?

  3. Есть ли лучшие альтернативы этому? например Могут ли потоки работать так же хорошо (это Debian Lenny Linux)? Существуют ли какие-либо библиотеки, которые обрабатывают рабочие пулы параллельных процессов, подобные этой?

  4. Есть ли взаимодействия с Джанго, о которых я должен знать?

Спасибо за чтение! Надеюсь, вы найдете эту проблему столь же интересной, как и я.

Брайан


person Brian M. Hunt    schedule 15.09.2009    source источник


Ответы (3)


Может показаться, что я критикую этот продукт, так как это второй раз, когда я отвечаю с рекомендацией этого.

Но похоже, что вам нужна служба очереди сообщений, в частности распределенная очередь сообщений.

Вот как это будет работать:

  1. Ваше приложение Django запрашивает CMD
  2. CMD добавляется в очередь
  3. CMD подталкивают к нескольким работам
  4. Он выполняется, и результаты возвращаются вверх по течению

Большая часть этого кода уже существует, и вам не нужно создавать собственную систему.

Взгляните на Celery, который изначально был создан с помощью Django.

http://www.celeryq.org/ http://robertpogorzelski.com/blog/2009/09/10/rabbitmq-celery-and-django/

person ismail    schedule 16.09.2009
comment
Это интересно - буду разбираться. Однако проблема, которую я могу (или не могу) иметь, заключается в том, что часть 1 шага № 4 (она выполняется, т. Е. Запускается LaTeX) должна произойти до № 2 (CMD добавляется в очередь, т. е. LaTeX получает данные). Тем не менее, я почти уверен, что Celery может это сделать, но для этого потребуется немного покопаться. - person Brian M. Hunt; 21.09.2009

Исси уже упомянул Сельдерей, но, поскольку комментарии плохо работают с примерами кода, вместо этого я отвечу как ответ.

Вы должны попытаться использовать Celery синхронно с хранилищем результатов AMQP. Вы можете распределить фактическое выполнение на другой процесс или даже на другую машину. Выполнение синхронно в сельдерее легко, например:

>>> from celery.task import Task
>>> from celery.registry import tasks

>>> class MyTask(Task):
...
...     def run(self, x, y):
...         return x * y 
>>> tasks.register(MyTask)

>>> async_result = MyTask.delay(2, 2)
>>> retval = async_result.get() # Now synchronous
>>> retval 4

Хранилище результатов AMQP делает отправку результата очень быстрой, но оно доступно только в текущей версии разработки (в заморозке кода до версии 0.8.0).

person asksol    schedule 18.09.2009
comment
Спасибо Ассоль. Одно требование состоит в том, чтобы Task всегда работал как демон, а затем просто отправлял/получал данные от него. LaTeX должен быть запущен до того, как вы вызовете run() (в противном случае вам придется ждать запуска LaTeX, что устраняет всю цель использования очереди задач). Я изучаю Celery, чтобы увидеть, может ли он это сделать (я ожидаю, что он может). - person Brian M. Hunt; 21.09.2009
comment
Я не вижу требования, чтобы LaTeX работал, кроме оптимизации? Для этого вам нужно будет использовать LaTeX C API (или что-то еще), чтобы запустить его, встроенный в рабочий процесс. Это должно быть возможно, но потребует значительной настройки сельдерея. Это может быть хорошей отправной точкой, поскольку она уже решает часть вашей проблемы. Я не говорил, что очередь задач подходит для этого, но часть распределенной/параллельной обработки может подойти. Вам нужен пул задач, и вы хотите отправлять/получать результаты, вы просто хотите, чтобы рабочие процессы были процессорами LaTeX. - person asksol; 22.09.2009

Как насчет «демонизации» вызова подпроцесса с помощью python-daemon или его преемника, седой.

person John Paulett    schedule 16.09.2009
comment
несвязанный, но что он делает лучше? У меня были некоторые проблемы, связанные с python-daemon, но я по своей сути скептически отношусь к коллекциям библиотек. - person asksol; 18.09.2009