Я хотел бы реализовать очень простой конвейер луча:
read google storage links to text files from PubSub topic->read each text line by line->write to BigQuery.
Apache Beam имеет предварительно реализованный PTransform для каждого процесса.
Таким образом, конвейер будет:
Pipeline | ReadFromPubSub("topic_name") | ReadAllFromText() | WriteToBigQuery("table_name")
Однако ReadAllFromText
() каким-то образом блокирует конвейер. Создание собственного PTransform, который возвращает случайную строку после чтения из PubSub и записи ее в таблицу BigQuery, работает нормально (без блокировки). Добавление фиксированного окна в 3 секунды или срабатывание каждого элемента также не решает проблему.
Каждый файл составляет около 10 МБ и 23 тыс. строк.
К сожалению, я не могу найти документацию о том, как должен работать ReadAllFromText
. Было бы очень странно, если бы он пытался заблокировать конвейер, пока не прочитает все файлы. И я ожидаю, что функция будет передавать каждую строку в конвейер, как только она прочитает строку.
Есть ли известная причина вышеуказанного поведения? Это баг или я что-то не так делаю?
Код конвейера:
pipeline_options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromPubSub(subscription=source_dict["input"]) \
| 'window' >> beam.WindowInto(window.FixedWindows(3, 0)) \
| ReadAllFromText(skip_header_lines=1)
elements = lines | beam.ParDo(SplitPayload())
elements | WriteToBigQuery(source_dict["output"], write_disposition=BigQueryDisposition.WRITE_APPEND)
.
.
.
class SplitPayload(beam.DoFn):
def process(self, element, *args, **kwargs):
timestamp, id, payload = element.split(";")
return [{
'timestamp': timestamp,
'id': id,
'payload': payload
}]
ReadFromPubSub
, потребуется около 10 секунд, чтобы начать записывать строки из этого файла в таблицу BigQuery. Когда я публикую 20 файлов, а затем начинаю читать, для начала записи в таблицу требуется около 30 секунд. Когда я пробую 360 файлов, для начала записи в таблицу требуется около 90 секунд. - person korujzade   schedule 07.09.2018ReadAllFromText
требуется больше времени, чтобы начать отправлять строки вWriteToBigQuery
, когда числоPCollections
(ссылки GS изReadFromPubSub
) увеличивается? Рн, я не могу протестировать с помощью DataFlow Runner. Может ли это быть из-за прямого бегуна? - person korujzade   schedule 07.09.2018ReadFromPubSub | WriteToBigQuery
, вышеуказанная проблема не возникнет, и он сразу начнет запись в таблицу. Это подтверждает мое предположение, чтоReadAllFromText
фактически блокирует конвейер. - person korujzade   schedule 09.09.2018ReadFromText
принимает исходный файл в качестве строкового параметра, но не PCollection. Однако я получаю ссылки на исходные файлы gs отReadFromPubSub
, что дает PCollections. Я полагаю, чтоReadFromText
будет полезен в пакетном режиме, так как его можно использовать в качестве входного адаптера. - person korujzade   schedule 10.09.2018ReadAllFromText
предназначен для чтения очень большого количества файлов, что и происходит в моей ситуации. - person korujzade   schedule 11.09.2018ReadFromText
, и это так, как я уже упоминал выше: принимает внешний источник в качестве параметра шаблона файла и возвращает PCollection для каждой строки. И просмотр кодаReadAllFromText
не меняет того факта, что он не работает должным образом с DirectRunner. Вы хотите, чтобы я нашел, в чем причина бага или что? - person korujzade   schedule 12.09.2018