Как сделать ReadAllFromText не конвейером Block Beam?

Я хотел бы реализовать очень простой конвейер луча:

read google storage links to text files from PubSub topic->read each text line by line->write to BigQuery.

Apache Beam имеет предварительно реализованный PTransform для каждого процесса.

Таким образом, конвейер будет:

Pipeline | ReadFromPubSub("topic_name") | ReadAllFromText() | WriteToBigQuery("table_name")

Однако ReadAllFromText() каким-то образом блокирует конвейер. Создание собственного PTransform, который возвращает случайную строку после чтения из PubSub и записи ее в таблицу BigQuery, работает нормально (без блокировки). Добавление фиксированного окна в 3 секунды или срабатывание каждого элемента также не решает проблему.

Каждый файл составляет около 10 МБ и 23 тыс. строк.

К сожалению, я не могу найти документацию о том, как должен работать ReadAllFromText. Было бы очень странно, если бы он пытался заблокировать конвейер, пока не прочитает все файлы. И я ожидаю, что функция будет передавать каждую строку в конвейер, как только она прочитает строку.

Есть ли известная причина вышеуказанного поведения? Это баг или я что-то не так делаю?

Код конвейера:

pipeline_options = PipelineOptions(pipeline_args)
    with beam.Pipeline(options=pipeline_options) as p:
        lines = p | ReadFromPubSub(subscription=source_dict["input"]) \
                | 'window' >> beam.WindowInto(window.FixedWindows(3, 0)) \
                | ReadAllFromText(skip_header_lines=1)

            elements = lines | beam.ParDo(SplitPayload())

            elements | WriteToBigQuery(source_dict["output"], write_disposition=BigQueryDisposition.WRITE_APPEND)
    .
    .
    .
    class SplitPayload(beam.DoFn):
        def process(self, element, *args, **kwargs):

            timestamp, id, payload = element.split(";")

            return [{
                'timestamp': timestamp,
                'id': id,
                'payload': payload
            }]

person korujzade    schedule 02.09.2018    source источник
comment
Можете ли вы опубликовать MCVE? Я попытался воспроизвести конвейер, который вы описали очень минимально, с помощью чего-то вроде (конвейер | луч.io.ReadFromPubSub(topic) | луч.io.textio.ReadAllFromText() | луч.Map(lambda x: {'i': x}) | beam.io.WriteToBigQuery(output, schema='i:STRING')) И ReadAllFromText не блокирует потоковую передачу. Выполняете ли вы какую-либо другую обработку ваших данных, которая может их заблокировать? Не забудьте также передать флаг --streaming.   -  person rilla    schedule 04.09.2018
comment
@rilla, пожалуйста, проверьте отредактированный ответ. В настоящее время я тестирую с помощью прямого бегуна. Если я опубликую одну GS-ссылку текстового файла в тему и прочитаю ее с помощью ReadFromPubSub, потребуется около 10 секунд, чтобы начать записывать строки из этого файла в таблицу BigQuery. Когда я публикую 20 файлов, а затем начинаю читать, для начала записи в таблицу требуется около 30 секунд. Когда я пробую 360 файлов, для начала записи в таблицу требуется около 90 секунд.   -  person korujzade    schedule 07.09.2018
comment
Так нормально ли то, что ReadAllFromText требуется больше времени, чтобы начать отправлять строки в WriteToBigQuery, когда число PCollections (ссылки GS из ReadFromPubSub) увеличивается? Рн, я не могу протестировать с помощью DataFlow Runner. Может ли это быть из-за прямого бегуна?   -  person korujzade    schedule 07.09.2018
comment
Если я изменю сценарий на ReadFromPubSub | WriteToBigQuery, вышеуказанная проблема не возникнет, и он сразу начнет запись в таблицу. Это подтверждает мое предположение, что ReadAllFromText фактически блокирует конвейер.   -  person korujzade    schedule 09.09.2018
comment
Вы пробовали ReadFromText вместо ReadAllFromText? Похоже, что ReadAllFromText оптимизирован для файлов большего размера, чем ваш вариант использования (1).   -  person Yurci    schedule 10.09.2018
comment
Функция @Yurci ReadFromText принимает исходный файл в качестве строкового параметра, но не PCollection. Однако я получаю ссылки на исходные файлы gs от ReadFromPubSub, что дает PCollections. Я полагаю, что ReadFromText будет полезен в пакетном режиме, так как его можно использовать в качестве входного адаптера.   -  person korujzade    schedule 10.09.2018
comment
@Yurci Кроме того, в указанном ответе упоминается, что ReadAllFromText предназначен для чтения очень большого количества файлов, что и происходит в моей ситуации.   -  person korujzade    schedule 11.09.2018
comment
Вы можете найти код для ReadAllFromText здесь и документацию по чтению входных данных здесь   -  person Yurci    schedule 12.09.2018
comment
Я уже читал, как работает чтение из входных данных с ReadFromText, и это так, как я уже упоминал выше: принимает внешний источник в качестве параметра шаблона файла и возвращает PCollection для каждой строки. И просмотр кода ReadAllFromText не меняет того факта, что он не работает должным образом с DirectRunner. Вы хотите, чтобы я нашел, в чем причина бага или что?   -  person korujzade    schedule 12.09.2018
comment
Привет, @korujzade, можешь ли ты проверить это с помощью Dataflow Runner? На моем конце я не мог воспроизвести это. Вы можете использовать шаблон Cloud Pub/Sub to BigQuery.   -  person Yurci    schedule 18.09.2018
comment
привет @Yurci. Проблема не возникает с DataFlow Runner. Однако пайплайн зависает на пару минут после успешной инициализации воркеров, а после запуска работает нормально. Но я думаю, что это еще одна проблема, которую я должен затронуть с отдельным вопросом.   -  person korujzade    schedule 24.09.2018