Я пишу программу на Apache Beam с использованием Python SDK для чтения из Pub / Sub содержимого файла JSON и выполнения некоторой обработки полученной строки. Это часть программы, в которой я извлекаю содержимое из Pub / Sub и выполняю обработку:
with beam.Pipeline(options=PipelineOptions()) as pipeline:
lines = pipeline | beam.io.gcp.pubsub.ReadStringsFromPubSub(subscription=known_args.subscription)
lines_decoded = lines | beam.Map(lambda x: x.decode("base64"))
lines_split = lines_decoded | (beam.FlatMap(lambda x: x.split('\n')))
def json_to_tuple(jsonStr):
res = json.loads(jsonStr)
##printing retutn value
print (res['id'], res['messageSize'])
##
return (res['id'], res['messageSize'])
tupled = lines_split | beam.Map(json_to_tuple)
def printlines(line):
print line
result = tupled | beam.CombinePerKey(sum)
result | beam.Map(printlines)
При запуске программы код застревает после создания PCollection tupled
(после этого ни одна строка кода не выполняется). Странно то, что когда я меняю источник с Pub / Sub на локальный файл, содержащий точно такое же содержимое (с использованием ReadFromText()
), программа работает отлично. В чем может быть причина такого поведения?