Вы можете следовать рекомендациям, приведенным в Руководстве по интеграции структурированной потоковой передачи и Kafka., чтобы понять, как преобразовать двоичное значение в строковое значение.
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(value AS STRING)")
.as[String]
Затем вы можете определить свою схему в зависимости от вашей фактической структуры json, например:
val schema: StructType = new StructType()
.add("field1", StringType)
.add("field2", ArrayType(new StructType()
.add("f2", StringType)
.add("f2", DoubleType)
))
Затем использование функции from_json
позволит вам обрабатывать данные в строке JSON, см. документация, например:
df.selectExpr("CAST(value AS STRING)")
.select(from_json('json, schema).as("data"))
После этого вы можете начать маскировку, заменив столбцы с помощью структурированного API, такого как withColumn
и drop
.
Если вы не хотите определять всю схему, вы можете подумать о работе с get_json_object
.
person
mike
schedule
16.10.2020