Эффективное чтение строк из сжатого, разбитого на фрагменты потока HTTP по мере их поступления.

Я написал HTTP-сервер, который создает бесконечные потоки HTTP, состоящие из событий, структурированных в формате JSON. Подобно потоковому API Twitter. Эти события разделены \n (согласно событиям, отправленным сервером с Content-Type:text /event-stream) и может различаться по длине.

Ответ

  • фрагментировано (HTTP 1.1 Transfer-Encoding: фрагментировано) из-за бесконечного потока
  • сжатый (Content-Encoding: gzip) для экономии полосы пропускания.

Я хочу использовать эти строки в Python, как только они появятся, и максимально эффективно использовать ресурсы, не изобретая велосипед.

Поскольку я сейчас использую python-запросы, знаете ли вы, как заставить его работать? Если вы думаете, что python-запросы здесь не помогут, я полностью открыт для альтернативных фреймворков/библиотек.

Моя текущая реализация основана на запросах и использует iter_lines(...) для получения строк. Но параметр chunk_size хитрый. Если установлено значение 1, это сильно загружает процессор, поскольку некоторые события могут занимать несколько килобайт. Если установлено любое значение выше 1, некоторые события застревают до прихода следующего, и весь буфер «заполняется». А время между событиями может составлять несколько секунд. Я ожидал, что chunk_size будет своего рода «максимальным количеством байтов для получения», как в recv(...) в Unix. Соответствующая справочная страница гласит:

Вызовы получения обычно возвращают любые доступные данные вплоть до запрошенной суммы, а не ждут получения всей запрошенной суммы.

Но это явно не так, как это работает в библиотеке запросов. Они используют его более или менее как «точное количество байтов для получения». Глядя на их исходный код, я не мог определить, какая часть отвечает за это. Возможно, ответ httplib или SSLSocket ssl.

В качестве обходного пути я попытался дополнить свои строки на сервере кратным размеру фрагмента. Но размер фрагмента в библиотеке запросов используется для выборки байтов из сжатого потока ответов. Так что это не сработает, пока я не смогу дополнить свои строки так, чтобы их сжатая последовательность байтов была кратна размеру фрагмента. Но это кажется слишком хакерским.

Я читал, что Twisted можно использовать для неблокирующей и небуферизованной обработки http-потоков на клиенте, но я нашел код только для создания потоковых ответов на сервере.


person Thomas B.    schedule 15.02.2014    source источник
comment
Знаете ли вы хороший фреймворк/библиотеку для этой задачи? Боюсь, запросы библиотеки не по теме.   -  person Martijn Pieters    schedule 15.02.2014
comment
Извините за запрос библиотеки. Как написано, в настоящее время я использую python-requests и хотел бы продолжать его использовать. Итак, мой вопрос в основном о том, как делать что-то с python-запросами. Но: если нет возможности, я вполне согласен с использованием другой библиотеки.   -  person Thomas B.    schedule 15.02.2014
comment
Возможен поиск строк. Лежащая в основе сжатия zlib поддерживает сброс (Z_SYNC_FLUSH), который используется, когда я сбрасываю ответ в Tornado в потоке ответов GZipped. Таким образом, http-поток в полном порядке, разбит на идеальные куски с полными сжатыми строками. Просто прочитать их обратно в Python сложно.   -  person Thomas B.    schedule 15.02.2014
comment
Мой анализ был неверным; Я пишу больше информации ниже. Это не ограничение, которое requests может обойти.   -  person Martijn Pieters    schedule 15.02.2014
comment
Однажды я сделал что-то подобное, в основном вырвав всю обработку сокета (ОСОБЕННО socket.makefile), чтобы получить правильную обработку буфера, и использовал select() для постепенного чтения данных из сокета. Работал нормально, но был основным PITA, чтобы все исправить. По сути, как только вы получите ответ, извлеките сокет из ответа и обработайте его самостоятельно. Это снизило загрузку ЦП с 90% до 2% и немного увеличило пропускную способность. (но это было с древним Python 2.2, 2.7+ httplib лучше работает с фрагментированным кодированием).   -  person schlenk    schedule 15.02.2014


Ответы (2)


Благодаря Martijn Pieters answer я перестал работать с поведением запросов python и искал совершенно другой подход.

В итоге я использовал pyCurl. Вы можете использовать его аналогично циклу select + recv, не инвертируя поток управления и не передавая управление выделенному циклу ввода-вывода, как в Tornado и т. д. Таким образом, легко использовать генератор, который выдает новые строки, как только они приходят. - без дополнительной буферизации в промежуточных слоях, которая может привести к задержке или дополнительным потокам, запускающим цикл ввода-вывода.

В то же время он достаточно высокоуровневый, поэтому вам не нужно беспокоиться о кодировании передачи по частям, шифровании SSL или сжатии gzip.

Это был мой старый код, где chunk_size=1 приводил к загрузке процессора на 45%, а chunk_size>1 приводил к дополнительной задержке.

import requests
class RequestsHTTPStream(object):
    def __init__(self, url):
        self.url = url

    def iter_lines(self):
        headers = {'Cache-Control':'no-cache',
                   'Accept': 'text/event-stream',
                   'Accept-Encoding': 'gzip'}
        response = requests.get(self.url, stream=True, headers=headers)
        return response.iter_lines(chunk_size=1)

Вот мой новый код, основанный на pyCurl: (К сожалению, стиль curl_easy_* perform полностью блокируется, что затрудняет вывод строк между ними без использования потоков. Поэтому я использую методы curl_multi_*)

import pycurl
import urllib2
import httplib
import StringIO

class CurlHTTPStream(object):
    def __init__(self, url):
        self.url = url
        self.received_buffer = StringIO.StringIO()

        self.curl = pycurl.Curl()
        self.curl.setopt(pycurl.URL, url)
        self.curl.setopt(pycurl.HTTPHEADER, ['Cache-Control: no-cache', 'Accept: text/event-stream'])
        self.curl.setopt(pycurl.ENCODING, 'gzip')
        self.curl.setopt(pycurl.CONNECTTIMEOUT, 5)
        self.curl.setopt(pycurl.WRITEFUNCTION, self.received_buffer.write)

        self.curlmulti = pycurl.CurlMulti()
        self.curlmulti.add_handle(self.curl)

        self.status_code = 0

    SELECT_TIMEOUT = 10

    def _any_data_received(self):
        return self.received_buffer.tell() != 0

    def _get_received_data(self):
        result = self.received_buffer.getvalue()
        self.received_buffer.truncate(0)
        self.received_buffer.seek(0)
        return result

    def _check_status_code(self):
        if self.status_code == 0:
            self.status_code = self.curl.getinfo(pycurl.HTTP_CODE)
        if self.status_code != 0 and self.status_code != httplib.OK:
            raise urllib2.HTTPError(self.url, self.status_code, None, None, None)

    def _perform_on_curl(self):
        while True:
            ret, num_handles = self.curlmulti.perform()
            if ret != pycurl.E_CALL_MULTI_PERFORM:
                break
        return num_handles

    def _iter_chunks(self):
        while True:
            remaining = self._perform_on_curl()
            if self._any_data_received():
                self._check_status_code()
                yield self._get_received_data()
            if remaining == 0:
                break
            self.curlmulti.select(self.SELECT_TIMEOUT)

        self._check_status_code()
        self._check_curl_errors()

    def _check_curl_errors(self):
        for f in self.curlmulti.info_read()[2]:
            raise pycurl.error(*f[1:])

    def iter_lines(self):
        chunks = self._iter_chunks()
        return self._split_lines_from_chunks(chunks)

    @staticmethod
    def _split_lines_from_chunks(chunks):
        #same behaviour as requests' Response.iter_lines(...)

        pending = None
        for chunk in chunks:

            if pending is not None:
                chunk = pending + chunk
            lines = chunk.splitlines()

            if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]:
                pending = lines.pop()
            else:
                pending = None

            for line in lines:
                yield line

        if pending is not None:
            yield pending

Этот код пытается получить как можно больше байтов из входящего потока, без ненужной блокировки, если их немного. Для сравнения, загрузка процессора составляет около 0,2%.

person Thomas B.    schedule 16.02.2014
comment
Не приведет ли это self.received_buffer.truncate(0) к потере данных? Я имею в виду данные, которые записываются в self.received_buffer методом обратного вызова (self.received_buffer.write), но еще не были прочитаны self.received_buffer.getvalue()? - person Amit; 09.04.2016
comment
Запись не происходит асинхронно, WRITEFUNCTION вызывается только внутри Perform(). Когда вы должны вызывать execute для чтения/записи данных, это указывается select(), чтобы не опрашивать без необходимости. - person Thomas B.; 10.04.2016
comment
Хорошо сделано! В любом случае, вы можете добавить пример того, как вы его использовали? Показ того, как вы можете использовать его с progressbar.ProgressBar, также будет очень признателен. - person Rob Sawyer; 31.03.2017

Это не requests' вина, что ваши iter_lines() вызовы блокируются.

Метод Response.iter_lines() вызывает Response.iter_content(), что звонки urllib3HTTPResponse.stream(), который вызывает HTTPResponse.read().

Эти вызовы проходят по размеру фрагмента, который передается в сокет как self._fp.read(amt). Это проблематичный вызов, так как self._fp — это файловый объект, созданный socket.makefile() ( как это делает модуль httplib); и этот вызов .read() будет блокироваться до тех пор, пока не будет прочитано amt (сжатых) байтов.

Этот низкоуровневый файловый объект сокета поддерживает вызов .readline(), который будет работать более эффективно, но urllib3 не может использовать этот вызов при обработке сжатых данных; терминаторы строк не будут видны в сжатом потоке.

К сожалению, urllib3 не будет вызывать self._fp.readline(), если ответ также не сжат; то, как структурированы вызовы, было бы трудно передать, что вы хотите прочитать в режиме буферизации строк, а не в режиме буферизации фрагментов, как это есть.

Я должен сказать, что HTTP — не лучший протокол для потоковой передачи событий; Я бы использовал другой протокол для этого. На ум приходят веб-сокеты или собственный протокол для вашего конкретного случая использования.

person Martijn Pieters    schedule 15.02.2014
comment
Спасибо, что указали, что проблема кроется глубоко в стеке http, используемом запросами! - person Thomas B.; 15.02.2014
comment
@ThomasB.: Да, и я не знаю библиотек, которые бы решили эту проблему чистым способом. Сжатие здесь не помогает; вам нужно будет использовать urllib2, затем получить доступ к необработанному сокету из объекта ответа, выполнить неблокирующие чтения и выполнить собственную распаковку. Не красиво. - person Martijn Pieters; 15.02.2014
comment
Я выбрал SSE после прочтения SSE vs WebSockets. Мне нужен только нисходящий поток. Клиент JS очень прост, сервер очень прост, а обратное проксирование, шифрование и сжатие просто отлично работают по HTTP. - person Thomas B.; 15.02.2014
comment
Я перешел на PyCurl. Он прозрачно обрабатывает фрагментацию, сжатие, проверку ssl и т. д. и гораздо более отзывчив (относительно отложенности буфера). Я обновлю свой вопрос, чтобы включить мои выводы. - person Thomas B.; 15.02.2014
comment
@ThomasB.: Вместо этого я бы сделал это самостоятельным ответом. - person Martijn Pieters; 15.02.2014