Проблемы преобразования Avro Kafka между scala и Python

В нашем проекте есть код scala и python, и нам нужно отправлять / использовать сообщения в кодировке avro в kafka.

Я отправляю сообщения avro-кодирования в kafka с помощью python и scala. У меня есть производитель в коде scala, который отправляет сообщения в кодировке avro с использованием библиотеки биекций Twitter следующим образом:

val resourcesPath = getClass.getResource("/avro/url_info_schema.avsc")
val schemaFile = scala.io.Source.fromURL(resourcesPath).mkString
val schema = parser.parse(schemaFile)
val recordInjection = GenericAvroCodecs[GenericRecord](schema)
val avroRecord = new GenericData.Record(schema)
avroRecord.put("url_sha256", row._1)
avroRecord.put("url", row._2._1)
avroRecord.put("timestamp", row._2._2)
val recordBytes = recordInjection.apply(avroRecord)
kafkaProducer.value.send("topic", recordBytes)

Схема Avro выглядит как

{
  "namespace": "com.rm.avro",
  "type": "record",
  "name": "url_info",
  "fields":[
     {
        "name": "url_sha256", "type": "string"
     },
     {
        "name": "url",  "type": "string"
     },
     {
        "name": "timestamp", "type": ["long"]
     }
 ]

}

Я могу успешно его декодировать в KafkaConsumer в scala

val resourcesPath = getClass.getResource("/avro/url_info_schema.avsc")
val schemaFile = scala.io.Source.fromURL(resourcesPath).mkString


kafkaInputStream.foreachRDD(kafkaRDD => {
  kafkaRDD.foreach(

    avroRecord => {
      val parser = new Schema.Parser()
      val schema = parser.parse(schemaFile)
      val recordInjection = GenericAvroCodecs[GenericRecord](schema)
      val record = recordInjection.invert(avroRecord.value()).get
      println(record)
    }
  )

}

Однако я не могу декодировать сообщения в python, я получаю следующее исключение

'utf8' codec can't decode byte 0xe4 in position 16: invalid continuation byte

коды Python выглядят следующим образом: schema_path = "avro / url_info_schema.avsc" schema = avro.schema.parse (open (schema_path) .read ())

for msg in consumer:
   bytes_reader = io.BytesIO(msg.value)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    decoded_msg = reader.read(decoder)
    print(decoded_msg)

Также сообщения производителя python avro не понимаются потребителем scala avro. У меня тут исключение. Производитель Python Avro выглядит следующим образом:

datum_writer = DatumWriter(schema)
bytes_writer = io.BytesIO()

datum_writer = avro.io.DatumWriter(schema)
encoder = avro.io.BinaryEncoder(bytes_writer)
datum_writer.write(data, encoder) 
raw_bytes = bytes_writer.getvalue()
producer.send(topic, raw_bytes)

Как сохранить единообразие между Python и Scala? Любые указатели будут отличными


person Abhishek    schedule 11.02.2017    source источник
comment
разобрался с решением. Скоро опубликую решение. Это может помочь другим.   -  person Abhishek    schedule 11.02.2017


Ответы (1)


Я использовал двоичный кодировщик в python и ничего в Scala. Просто нужно было изменить одну строку с

val recordInjection = GenericAvroCodecs[GenericRecord](schema)

to

val recordInjection = GenericAvroCodecs.toBinary[GenericRecord](schema)

Я надеюсь, что другие сочтут это полезным. Никаких изменений в коде Python не требуется

person Abhishek    schedule 11.02.2017