Чтение и запись сериализованного protobuf в Beam

Я полагаю, должно быть легко записать PCollection сериализованных сообщений protobuf в текстовые файлы и прочитать их обратно. Но после нескольких попыток мне это не удалось. Был бы признателен, если у кого-нибудь есть комментарий.

// definition of proto.

syntax = "proto3";
package test;
message PhoneNumber {
  string number = 1;
  string country = 2;
}

У меня есть код Python ниже, который реализует простой конвейер Beam для записи текстов в сериализованные protobuf.

# Test python code
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import phone_pb2

class ToProtoFn(beam.DoFn):
  def process(self, element):
    phone = phone_pb2.PhoneNumber()
    phone.number, phone.country = element.strip().split(',')
    yield phone.SerializeToString()

with beam.Pipeline(options=PipelineOptions()) as p:
  lines = (p 
      | beam.Create(["123-456-789,us", "345-567-789,ca"])
      | beam.ParDo(ToProtoFn())
      | beam.io.WriteToText('/Users/greeness/data/phone-pb'))

Конвейер может быть успешно запущен и создать файл с содержимым:

$ cat ~/data/phone-pb-00000-of-00001 


123-456-789us


345-567-789ca

Затем я кодирую другой конвейер для чтения сериализованных протоколов и их синтаксического анализа с помощью ParDo.

class ToCsvFn(beam.DoFn):
  def process(self, element):
    phone = phone_pb2.PhoneNumber()
    phone.ParseFromString(element)
    yield ",".join([phone.number, phone.country])

with beam.Pipeline(options=PipelineOptions()) as p:
  lines = (p 
      | beam.io.ReadFromText('/Users/greeness/data/phone*')
      | beam.ParDo(ToCsvFn())
      | beam.io.WriteToText('/Users/greeness/data/phone-csv'))

Я получил это сообщение об ошибке при запуске.

  File "/Library/Python/2.7/site-packages/apache_beam/runners/common.py", line 458, in process_outputs
  for result in results:
  File "phone_example.py", line 37, in process
phone.ParseFromString(element)
  File "/Library/Python/2.7/site-packages/google/protobuf/message.py", line 185, in ParseFromString
  self.MergeFromString(serialized)
  File "/Library/Python/2.7/site-packages/google/protobuf/internal/python_message.py", line 1069, in MergeFromString
  raise message_mod.DecodeError('Truncated message.')
  DecodeError: Truncated message. [while running 'ParDo(ToCsvFn)']

Таким образом, похоже, что сериализованную строку protobuf невозможно проанализировать. Я что-то упускаю? Спасибо за любую помощь!


person greeness    schedule 22.01.2018    source источник


Ответы (2)


Я нахожу временное решение через реализованный tfrecordio.py < / а>.

Код ниже работает. Но я все еще открыт для любых комментариев, которые могут решить указанную выше проблему.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import phone_pb2

def WriteTextToTFRecord():
  class ToProtoFn(beam.DoFn):
    def process(self, element):
      phone = phone_pb2.PhoneNumber()
      phone.number, phone.country = element.strip().split(',')
      yield phone
  with beam.Pipeline(options=PipelineOptions()) as p:
    lines = p | beam.Create(["123-456-789,us", "345-567-789,ca"])
    processed = (
        lines
        | beam.ParDo(ToProtoFn())
        | beam.io.WriteToTFRecord('/Users/greeness/data/phone-pb',
                                  coder=beam.coders.ProtoCoder(phone_pb2.PhoneNumber().__class__)))

def ReadTFRecordAndSaveAsCSV():
  class ToCsvFn(beam.DoFn):
    def process(self, element):
      yield ','.join([element.number, element.country])
  with beam.Pipeline(options=PipelineOptions()) as p:
    lines = (p
      | beam.io.ReadFromTFRecord('/Users/greeness/data/phone-pb-*',
                                 coder=beam.coders.ProtoCoder(phone_pb2.PhoneNumber().__class__))
      | beam.ParDo(ToCsvFn())
      | beam.io.WriteToText('/Users/greeness/data/phone-csv'))

if __name__ == '__main__':
  WriteTextToTFRecord()
  ReadTFRecordAndSaveAsCSV()
person greeness    schedule 22.01.2018

TFRecord - это деталь здесь, а это значит, что вы все еще можете заставить его работать с TextIO.

Уловка здесь - это Coder, который используется для кодирования и декодирования типа во время выполнения конвейера. Как правило, вы должны использовать их, если тип не является встроенным / тривиальным. В классе protobuf использование ProtoCoder - это просто правильный поступок.

from google.protobuf.timestamp_pb2 import Timestamp
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

class ToProtoFn(beam.DoFn):
  def process(self, element):
    timestamp = Timestamp()
    timestamp.seconds, timestamp.nanos = [int(x) for x in element.strip().split(',')]
    print(timestamp)
    yield timestamp

with beam.Pipeline(options=PipelineOptions()) as p:
  lines = (p 
      | beam.Create(["1586753000,222333000", "1586754000,222333000"])
      | beam.ParDo(ToProtoFn())
      | beam.io.WriteToText('time-pb',
                            coder=beam.coders.ProtoCoder(Timestamp().__class__)))

class ToCsvFn(beam.DoFn):
  def process(self, element):
    print(element)
    yield ",".join([str(element.seconds), str(element.nanos)])

with beam.Pipeline(options=PipelineOptions()) as p:
  lines = (p 
      | beam.io.ReadFromText('time-pb-00000-of-00001',
                              coder=beam.coders.ProtoCoder(Timestamp().__class__))
      | beam.ParDo(ToCsvFn())
      | beam.io.WriteToText('time-csv'),
      )
person dayfine    schedule 13.04.2020