SDK Apache Beam Python с исходным кодом Pub / Sub зависает во время выполнения

Я пишу программу на Apache Beam с использованием Python SDK для чтения из Pub / Sub содержимого файла JSON и выполнения некоторой обработки полученной строки. Это часть программы, в которой я извлекаю содержимое из Pub / Sub и выполняю обработку:

with beam.Pipeline(options=PipelineOptions()) as pipeline:
    lines = pipeline | beam.io.gcp.pubsub.ReadStringsFromPubSub(subscription=known_args.subscription)
    lines_decoded = lines | beam.Map(lambda x: x.decode("base64"))

    lines_split = lines_decoded | (beam.FlatMap(lambda x: x.split('\n')))

    def json_to_tuple(jsonStr):
        res = json.loads(jsonStr)
        ##printing retutn value
        print (res['id'], res['messageSize'])
        ##
        return (res['id'], res['messageSize'])

    tupled = lines_split | beam.Map(json_to_tuple)

    def printlines(line):
        print line

    result = tupled | beam.CombinePerKey(sum)
    result | beam.Map(printlines)

При запуске программы код застревает после создания PCollection tupled (после этого ни одна строка кода не выполняется). Странно то, что когда я меняю источник с Pub / Sub на локальный файл, содержащий точно такое же содержимое (с использованием ReadFromText()), программа работает отлично. В чем может быть причина такого поведения?


person Arjun Kay    schedule 19.03.2018    source источник


Ответы (1)


Согласно документации ввода-вывода Pub / Sub (как документы Apache Beam и Dataflow Pub / Sub I / O docs), по умолчанию преобразования PubsubIO работают с неограниченными коллекциями PCollections.

Коллекции PCollections могут быть ограниченными или неограниченными:

  • Ограниченный: данные поступают из фиксированного источника, например файла.
  • Без ограничений: данные поступают из источника, который постоянно обновляется, например из подписки Pub / Sub.

Прежде чем работать с неограниченной коллекцией PCollection, необходимо использовать одну из следующих стратегий:

  • Работа с окнами: неограниченные коллекции PCollections нельзя напрямую использовать для преобразования группировки (например, используемого вами CombinePerKey), поэтому сначала следует установить неглобальную оконную функцию.
  • Триггеры: вы можете настроить триггер для неограниченной коллекции PCollection таким образом, что она обеспечивает периодические обновления неограниченного набора данных, даже если данные в подписке все еще передаются.

Это может объяснить наблюдаемое вами поведение, т.е. тот же конвейер работает, когда он читает из локального файла (который является ограниченным источником данных), но не работает, когда он читает из подписки Pub / Sub (который является неограниченным источником данных).

Следовательно, чтобы работать с подпиской Pub / Sub, вы должны применить оконную стратегию или стратегию запуска, чтобы данные в коллекциях PCollections могли быть правильно обработаны в следующих преобразованиях.

РЕДАКТИРОВАТЬ: Кроме того, как выяснил @Arjun, может потребоваться включить потоковую передачу в конвейере с опцией, установив соответствующий параметр arg с помощью следующей команды:

pipeline_options.view_as(StandardOptions).streaming = True
person dsesto    schedule 24.03.2018
comment
Спасибо. Код заработал. Но одно только оконное управление не помогло. Добавление этой строки решило проблему pipeline_options.view_as(StandardOptions).streaming = True. Есть идеи, что это делает? - person Arjun Kay; 26.03.2018
comment
Согласно документации класс PipelineOptions - это не что иное, как оболочка для анализа аргументов. В этом случае вы установка для параметра streaming значения true, что включает Streaming Piplines, который вы используете. - person dsesto; 02.04.2018
comment
Понятно. Спасибо. - person Arjun Kay; 02.04.2018
comment
Не стесняйтесь принять ответ, если вы сочли его полезным, чтобы сообщество сочло этот вопрос решенным. Спасибо! - person dsesto; 02.04.2018
comment
Я буду. Я предлагаю добавить к вашему ответу вышеупомянутый пункт, то есть установить для параметра потоковой передачи значение true, потому что проблема сохраняется, когда это не сделано. - person Arjun Kay; 02.04.2018