Разделение строки сообщения Kafka на строку в структурированной потоковой передаче Spark

Я хочу прочитать сообщение из темы Kafka в моем задании Spark Structured Streaming во фрейме данных. но я получаю все сообщение в одном смещении, поэтому во фрейме данных только это сообщение попадает в одну строку, а не в несколько строк. (в моем случае это 3 ряда)

Когда я печатаю это сообщение, я получаю следующий результат:

введите описание изображения здесь

Сообщение «Text1», «Text2» и «Text3» я хочу разместить в 3 строках во фрейме данных, чтобы я мог обрабатывать дальше.

Пожалуйста помогите.


person Atanu chatterjee    schedule 08.02.2019    source источник


Ответы (1)


вы можете использовать определяемую пользователем функцию (UDF) для преобразования строки сообщения в последовательность строк, а затем применить функцию explode к этому столбцу, чтобы создать новую строку для каждого элемента в последовательности:

Как показано ниже (в scala тот же принцип применяется к pyspark):

case class KafkaMessage(offset: Long, message: String)

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.explode

val df = sc.parallelize(List(KafkaMessage(1000, "Text1\nText2\nText3"))).toDF()

val splitString = udf { s: String => s.split('\n') }

df.withColumn("splitMsg", explode(splitString($"message")))
  .select("offset", "splitMsg")
  .show()

это даст следующий результат:

+------+--------+
|offset|splitMsg|
+------+--------+
|  1000|   Text1|
|  1000|   Text2|
|  1000|   Text3|
+------+--------+
person Joachim    schedule 22.02.2019