Просто хотите знать, как извлечь вещи из PCollection? Скажем, я применил Count.Globally, поэтому в результирующей коллекции PCollection есть одно число, но как я могу извлечь его как длинное значение?
Спасибо.
Просто хотите знать, как извлечь вещи из PCollection? Скажем, я применил Count.Globally, поэтому в результирующей коллекции PCollection есть одно число, но как я могу извлечь его как длинное значение?
Спасибо.
Это зависит от того, как вы хотите использовать это значение.
Если вы хотите прочитать это значение после завершения конвейера, вы можете использовать одно из преобразований записи (например, AvroIO.Write), чтобы записать его в какой-либо вывод, который вы могли бы затем прочитать из любого кода, выполняемого после завершения вашего конвейера.
Если вы хотите использовать это значение в последующей части конвейера, вы можете применить View для создания PCollectionView, который затем можно передать в качестве побочного ввода для других преобразований.
Рассмотрим простой пример, где цель состоит в том, чтобы распечатать Count. Count не будет доступен до тех пор, пока конвейер не запустится. Итак, в этом случае мы могли бы сделать следующее
Вы всегда должны думать о PCollection
как о потоке. Тот факт, что вы применили преобразование, создающее одно значение для каждого окна, не гарантирует, что на самом деле существует только одно значение. Это зависит от стратегии управления окнами, поэтому в случае использования GlobalWindow может быть одно значение, но будет много значений для других типов оконных функций (например, скользящих окон).
Поэтому невозможно извлечь это единственное значение напрямую (например, что-то вроде PCollection.get()
) — возвращаемое значение должно быть потоком. Если вы хотите получить результат из PCollection, вы должны применить к нему преобразование, которое сохранит его где-то. Существует богатый набор встроенных модулей ввода-вывода (см. здесь) . Если вы хотите получить результирующие значения и использовать их в программе позже, лучшим вариантом будет сохранить их в какой-либо общей базе данных по вашему выбору и получить это значение после завершения вашего конвейера. Обратите внимание, что это означает, что ваш конвейер ограничен (например, пакетный, а не потоковый), иначе он никогда не завершится. Но ваш вопрос предполагает, что вы имеете в виду ограниченный конвейер.