Как извлечь содержимое из PCollection в Cloud Dataflow?

Просто хотите знать, как извлечь вещи из PCollection? Скажем, я применил Count.Globally, поэтому в результирующей коллекции PCollection есть одно число, но как я могу извлечь его как длинное значение?

Спасибо.


person darkjh    schedule 18.01.2015    source источник


Ответы (2)


Это зависит от того, как вы хотите использовать это значение.

Если вы хотите прочитать это значение после завершения конвейера, вы можете использовать одно из преобразований записи (например, AvroIO.Write), чтобы записать его в какой-либо вывод, который вы могли бы затем прочитать из любого кода, выполняемого после завершения вашего конвейера.

Если вы хотите использовать это значение в последующей части конвейера, вы можете применить View для создания PCollectionView, который затем можно передать в качестве побочного ввода для других преобразований.

Рассмотрим простой пример, где цель состоит в том, чтобы распечатать Count. Count не будет доступен до тех пор, пока конвейер не запустится. Итак, в этом случае мы могли бы сделать следующее

  • Определите DoFn‹Long, String>, который мы применяем к счетчику, чтобы превратить Long в сообщение, которое мы хотим распечатать.
  • Примените преобразование TextIO.Write, чтобы записать сообщение в файл.
  • Запустите задание и дождитесь его завершения. Если мы хотим выполнить с помощью службы потока данных, мы можем использовать BlockingDataflowRunner, чтобы дождаться завершения задания.
  • После завершения задания прочитайте текстовый файл, созданный для получения сообщения, и распечатайте его.
person Jeremy Lewi    schedule 19.01.2015
comment
Я думаю, что хочу получить значение Long и использовать его внутри программы. Say используется в операторе if. - person darkjh; 19.01.2015
comment
Я добавил пример, это отвечает на ваш вопрос? Важно то, что коллекция PCollection не материализуется до тех пор, пока не запустится конвейер. Поэтому вам нужно запустить конвейер, чтобы получить доступ к значению. Конвейеры также могут (в зависимости от исполнителя) работать асинхронно и независимо от вашей основной программы. - person Jeremy Lewi; 20.01.2015
comment
Джереми, неужели нет прямого способа получить доступ к счетчику, даже если предположить, что мы уже запустили конвейер? Кажется, действительно косвенно записать его на диск, а затем прочитать его обратно. - person emchristiansen; 28.01.2016
comment
Вы можете материализовать его в своем конвейере, но не в своей основной программе. Ваша основная программа строит граф преобразований. Если вы выполняете этот граф в службе потока данных, то вычисления фактически выполняются на разных машинах, а не на той, на которой работает ваша основная программа. Таким образом, чтобы ваша основная программа могла читать данные, вам нужно использовать преобразование ввода-вывода, чтобы записать их в доступное из вашей основной программы место (например, File/BigQuery/Datastore/Pubsub). Однако, если вы хотите прочитать его в более позднем преобразовании, Dataflow передаст значение для вас. - person Jeremy Lewi; 29.01.2016

Вы всегда должны думать о PCollection как о потоке. Тот факт, что вы применили преобразование, создающее одно значение для каждого окна, не гарантирует, что на самом деле существует только одно значение. Это зависит от стратегии управления окнами, поэтому в случае использования GlobalWindow может быть одно значение, но будет много значений для других типов оконных функций (например, скользящих окон).

Поэтому невозможно извлечь это единственное значение напрямую (например, что-то вроде PCollection.get()) — возвращаемое значение должно быть потоком. Если вы хотите получить результат из PCollection, вы должны применить к нему преобразование, которое сохранит его где-то. Существует богатый набор встроенных модулей ввода-вывода (см. здесь) . Если вы хотите получить результирующие значения и использовать их в программе позже, лучшим вариантом будет сохранить их в какой-либо общей базе данных по вашему выбору и получить это значение после завершения вашего конвейера. Обратите внимание, что это означает, что ваш конвейер ограничен (например, пакетный, а не потоковый), иначе он никогда не завершится. Но ваш вопрос предполагает, что вы имеете в виду ограниченный конвейер.

person Jan Lukavsky    schedule 22.04.2021