Отсутствует пользовательский заголовок Avro при использовании потоковой передачи Spark SQL

Перед отправкой Avro GenericRecord в Kafka заголовок вставляется следующим образом.

ProducerRecord<String, byte[]> record = new ProducerRecord<>(topicName, key, message);
record.headers().add("schema", schema);

Потребление записи.

При использовании Spark Streaming заголовок из ConsumerRecord остается нетронутым.

    KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, byte[]>Subscribe(topics, kafkaParams)).foreachRDD(rdd -> {
          rdd.foreach(record -> {

            System.out.println(new String(record.headers().headers("schema").iterator().next().value()));
          });
        });
    ;

Но при использовании Spark SQL Streaming заголовок отсутствует.

   StreamingQuery query = dataset.writeStream().foreach(new ForeachWriter<>() {

      ...

      @Override
      public void process(Row row) {
        String topic = (String) row.get(2);
        int partition = (int) row.get(3);
        long offset = (long) row.get(4);
        String key = new String((byte[]) row.get(0));
        byte[] value = (byte[]) row.get(1);

        ConsumerRecord<String, byte[]> record = new ConsumerRecord<String, byte[]>(topic, partition, offset, key,
            value);

        //I need the schema to decode the Avro!

      }
    }).start();

Где я могу найти настраиваемое значение заголовка при использовании подхода Spark SQL Streaming?

Версия:

<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.5</version>

ОБНОВЛЕНИЕ

Я попробовал 3.0.0-preview2 для spark-sql_2.12 и spark-sql-kafka-0-10_2.12. я добавил

.option("includeHeaders", true)

Но я по-прежнему получаю эти столбцы только из строки.

+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+



Ответы (1)


Заголовки Kafka в Structured Streaming поддерживаются только начиная с версии 3.0: https://spark.apache.org/docs/3.0.0-preview/structured-streaming-kafka-integration.html Дополнительные сведения см. в разделе includeHeaders.

person Gabor Somogyi    schedule 09.06.2020
comment
Что касается 3.0.0-preview2, схема Row еще не включает заголовки? Я получаю исключение в потоке main. - person Dale Angus; 10.06.2020
comment
См. код: github.com/apache/spark/blob/ - person Gabor Somogyi; 10.06.2020
comment
Нужно установить includeHeaders иначе не работает. - person Gabor Somogyi; 10.06.2020
comment
Хм, как я вижу, вы уже добавили includeHeaders. Я посмотрю на это глубже и исправлю, если внутри есть ошибка... - person Gabor Somogyi; 10.06.2020
comment
Не уверен, в чем проблема на вашей стороне, но здесь все работает безупречно: /org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala#L1655" rel="nofollow noreferrer">github.com/gaborgsomogyi/spark/blob/ Когда я выполнил тест, появилась следующая запись: aaaaaaaaaaaa: WrappedArray([a,[B@3fcf9f74], [c,[B@2bd9204b]) Пожалуйста, проанализируйте разницу между вашим приложением и упомянутым кодом Spark. - person Gabor Somogyi; 10.06.2020
comment
Получил работу! Но столкнулся с описанной здесь ошибкой. issues.apache.org/jira/browse/SPARK-30495 - person Dale Angus; 10.06.2020