Потоковая передача Spark и kafka Отсутствует необходимый раздел конфигурации partition.assignment.strategy, у которого нет значения по умолчанию

Я пытаюсь запустить приложение потоковой передачи искры с Kafka, используя пряжу. Я получаю следующую ошибку трассировки стека:

Вызвано: org.apache.kafka.common.config.ConfigException: Отсутствует требуемая конфигурация «partition.assignment.strategy», которая не имеет значения по умолчанию. на org.apache.kafka.common.config.ConfigDef.parse (ConfigDef.java:124) на org.apache.kafka.common.config.AbstractConfig. (AbstractConfig.java:48) на org.apache.kafka.clients. consumer.ConsumerConfig. (ConsumerConfig.java:194) на org.apache.kafka.clients.consumer.KafkaConsumer. (KafkaConsumer.java:380) на org.apache.kafka.clients.consumer.KafkaConsumer. (KafkaConsumer. (KafkaConsumer. (KafkaConsumer. ) на org.apache.kafka.clients.consumer.KafkaConsumer. (KafkaConsumer.java:350) на org.apache.spark.streaming.kafka010.CachedKafkaConsumer. (CachedKafkaConsumer.scala: 45) на org.apache.spark. kafka010.CachedKafkaConsumer $ .get (CachedKafkaConsumer.scala: 194) в org.apache.spark.streaming.kafka010.KafkaRDDIterator. (KafkaRDD.scala: 252) в org.apache.spark.streaming.KputeDka010. scala: 212) на org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 324) на org.apache.spark.rdd.RDD.iterator (RDD.scala: 288) на org.apache.spark.r dd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 49) в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 324) в org.apache.spark.rdd.RDD.iterator (RDD.scala: 288) в org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 87) в org.apache.spark.scheduler.Task.run (Task.scala: 109) в org.apache.spark.executor.Executor $ TaskRunner .run (Executor.scala: 345)

Вот фрагмент моего кода о том, как я создаю свой KafkaStream с искровым потоком -

        val ssc = new StreamingContext(sc, Seconds(60))

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "*boorstrap_url:port*",
  "security.protocol" -> "SASL_PLAINTEXT",
  "sasl.kerberos.service.name" -> "kafka",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "annotation-test",
  //Tried commenting and uncommenting this property      
  //"partition.assignment.strategy"->"org.apache.kafka.clients.consumer.RangeAssignor",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (false: java.lang.Boolean))

val topics = Array("*topic-name*")

val kafkaStream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams))
val valueKafka = kafkaStream.map(record => record.value())

Я просмотрел следующий пост -

  1. https://issues.apache.org/jira/browse/KAFKA-4547
  2. Ошибка конфигурации Pyspark Structured Streaming Kafka

В соответствии с этим я обновил свой kafka util jar в своей толстой банке до версии 0.10.2.0 с 0.10.1.0, упакованной по умолчанию из spark-stream-kafka-jar как временный зависимость. Также моя работа отлично работает, когда я запускаю ее на одном узле, установив master как локальный. Я использую версию Spark 2.3.1.


person Y0gesh Gupta    schedule 13.03.2019    source источник
comment
Можете ли вы попробовать изменить свою стратегию на org.apache.kafka.clients.consumer.RoundRobinAssignor или вместо partition.assignment.strategy попробуйте установить consumer.partition.assignment.strategy   -  person suraj_fale    schedule 13.03.2019
comment
Я попробовал, затем получаю ошибку - java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign (Ljava / util / Collection;) V   -  person Y0gesh Gupta    schedule 13.03.2019
comment
попробуйте установить consumer.partition.assignment.strategy вместо partition.assignment.strategy.   -  person suraj_fale    schedule 13.03.2019
comment
Я получаю ту же ошибку - Отсутствует требуемый раздел конфигурации partition.assignment.strategy, который не имеет значения по умолчанию.   -  person Y0gesh Gupta    schedule 13.03.2019
comment
Думаю, этот вопрос может быть полезен: stackoverflow.com/questions/43035542/   -  person Bartosz Wardziński    schedule 13.03.2019
comment
Да, я пробовал это, но у меня нет другого клиента kafka в моей среде. Я также отправляю искровое задание с помощью --conf spark.executor.userClassPathFirst = true --conf spark.driver.userClassPathFirst = true, чтобы сначала проверить класс в моей толстой банке.   -  person Y0gesh Gupta    schedule 13.03.2019


Ответы (1)


Добавьте kafka-clients-*.jar в папку с искровым хранилищем. kafka-clients-*.jar находится в kafka-*/lib каталоге.

person Hassan Ahmadkhani    schedule 15.10.2019