Kafka createDirectStream в Spark Streaming

Я пробую пример кода из Spark Streaming + Kafka. Руководство по интеграции (брокер Kafka версии 0.10.0 или выше). Код может работать без ошибок, но я не могу получить какую-либо запись. Если я запущу kafka-console-consumer.sh --from-beginning, я смогу получить записи. Кто-нибудь знает причину? Мой код выглядит следующим образом:

val broker = "221.181.73.44:19092"
val topics = Array("connect-test")
val groupid = "SparkStreamingLoad3"
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> broker,
  "group.id" -> groupid,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "auto.offset.reset" -> "earliest", //earliest | latest
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

stream.print()

ssc.start()
ssc.awaitTermination()

Моя сборка SBT:

version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-streaming-kafka-0-10_2.10" % "2.1.0",
  "org.apache.spark" % "spark-core_2.10" % "2.1.0",
"org.apache.spark" % "spark-streaming_2.10" % "2.1.0",
"org.apache.kafka" % "kafka_2.10" % "0.10.2.1"
)

Спасибо!


person rsmin    schedule 22.06.2017    source источник
comment
У вас работает Kafka версии 0.10.x?   -  person maasg    schedule 22.06.2017
comment
Версия Kafka, работающая на сервере, — 0.10.2.1. В папке libs у меня есть файлы kafka_2.10-0.10.2.1.*. Версия такая же, как и в конфигурациях сборки SBT.   -  person rsmin    schedule 22.06.2017


Ответы (2)


Наконец-то я решил проблему. Вот ответ:

  1. Данные в теме генерируются производителем консоли, который представляет собой список строк. Однако данные имеют формат [Массив[Байт], Массив[Байт]]. Не [строка, строка]. Поэтому, если я использую StringDeserializer, данные не будут получены.

  2. Я узнал из исходного кода консоли-потребителя writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit

Ключ/значение в RDD может содержать нулевые значения. В моем случае все ключи нулевые. Я использую следующий код для получения данных:

stream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte]](ssc, PreferConsistent, Subscribe[Array[Byte], Array[Byte]](topics, kafkaParams)) stream.map(rdd=>new String(Option (rdd.key()).getOrElse("null".getBytes))+ "|||разделитель|||"+new String(Option(rdd.value()).getOrElse("null".getBytes))) .Распечатать()

person rsmin    schedule 27.06.2017
comment
как вы проверяли истинный формат данных в потоке? - person Paul; 11.07.2017

val broker = "221.181.73.44:19092"

Порт по умолчанию — 9092, проблема может быть в нем.

"auto.offset.reset" -> "earliest" и "enable.auto.commit" -> false всегда следует читать с начала логов вашей темы, так как ваши смещения нигде не хранятся. Так что с этим проблем нет.

Кроме того, можем ли мы увидеть полную команду, которую вы используете для kafka-console-consumer.sh ?

person Paul Leclercq    schedule 22.06.2017