Маскируйте данные, поступающие из потока Kafka

Я использую искровую структурированную потоковую передачу для потоковой передачи данных из kafka, что дает мне фрейм данных со схемой ниже

Column     Type
key        binary
value      binary
topic      string
partition  int
offset     long
timestamp  long
timestampType   int

Значение Colum поступает сюда в двоичном формате, но на самом деле это строка json с типом структуры, и требуется прочитать структуру json и замаскировать несколько полей, входящих в нее, и записать данные.




Ответы (1)


Вы можете следовать рекомендациям, приведенным в Руководстве по интеграции структурированной потоковой передачи и Kafka., чтобы понять, как преобразовать двоичное значение в строковое значение.

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

df.selectExpr("CAST(value AS STRING)")
  .as[String]

Затем вы можете определить свою схему в зависимости от вашей фактической структуры json, например:

val schema: StructType = new StructType()
    .add("field1", StringType)
    .add("field2", ArrayType(new StructType()
      .add("f2", StringType)
      .add("f2", DoubleType)
    ))

Затем использование функции from_json позволит вам обрабатывать данные в строке JSON, см. документация, например:

df.selectExpr("CAST(value AS STRING)")
  .select(from_json('json, schema).as("data"))

После этого вы можете начать маскировку, заменив столбцы с помощью структурированного API, такого как withColumn и drop.

Если вы не хотите определять всю схему, вы можете подумать о работе с get_json_object.

person mike    schedule 16.10.2020
comment
был ли вопрос больше не о маскировке? - person thebluephantom; 16.10.2020
comment
Позвольте мне добавить некоторые детали к вопросу. Я хочу, чтобы данные в потоке маскировались в реальном времени, а затем выгружались на уровне хранилища. - person Harshit; 17.10.2020