Запись в динамическое место назначения в облачное хранилище в потоке данных в Python

Я пытался читать из большого файла в облачном хранилище и сегментировать их в соответствии с заданным полем.

Я планирую прочитать | Map(лямбда x: (x[ключевое поле], x)) | группа по ключу | Запись в файл с именем ключевого поля.

Однако я не смог найти способ динамической записи в облачное хранилище. Поддерживается ли этот функционал?

Спасибо, Ицин.


person yiqing_hua    schedule 15.02.2018    source источник


Ответы (2)



Экспериментальная запись была добавлена ​​в SDK Beam python в версии 2.14.0, beam.io.fileio.WriteToFiles:

my_pcollection | beam.io.fileio.WriteToFiles(
      path='/my/file/path',
      destination=lambda record: 'avro' if record['type'] == 'A' else 'csv',
      sink=lambda dest: AvroSink() if dest == 'avro' else CsvSink(),
      file_naming=beam.io.fileio.destination_prefix_naming())

который можно использовать для записи в разные файлы для каждой записи.

Вы можете пропустить GroupByKey, просто используйте destination, чтобы решить, в какой файл записывается каждая запись. Возвращаемое значение destination должно быть значением, которое можно сгруппировать.

Больше документации здесь:

https://beam.apache.org/releases/pydoc/2.14.0/apache_beam.io.fileio.html#dynamic-destinations

И проблема JIRA здесь:

https://issues.apache.org/jira/browse/BEAM-2857

person anrope    schedule 18.08.2019