Совместимость Spring Kafka, Spring Cloud Stream и Avro Неизвестный магический байт

У меня проблема с десериализацией сообщений из тем Kafka. Сообщения были сериализованы с помощью spring-cloud-stream и Apache Avro. Я читаю их с помощью Spring Kafka и пытаюсь десериализовать их. Если я использую spring -cloud как для создания, так и для потребления сообщений, тогда я могу десериализовать сообщения нормально. Проблема в том, что я использую их с помощью Spring Kafka, а затем пытаюсь десериализовать.

Я использую реестр схем (как реестр схем весенней загрузки для разработки, так и схему Confluent в производственной среде), но проблемы с десериализацией, похоже, возникают до того, как событие вызывает реестр схем.

Трудно опубликовать весь соответствующий код по этому вопросу, поэтому я разместил его в репозитории в git-хабе: https://github.com/robjwilkins/avro-example

Объект, который я отправляю по теме, - это просто pojo:

@Data
public class Request {
  private String message;
}

Код, который генерирует сообщения в Kafka, выглядит так:

@EnableBinding(MessageChannels.class)
@Slf4j
@RequiredArgsConstructor
@RestController
public class ProducerController {

  private final MessageChannels messageChannels;

  @GetMapping("/produce")
  public void produceMessage() {
    Request request = new Request();
    request.setMessage("hello world");
    Message<Request> requestMessage = MessageBuilder.withPayload(request).build();
    log.debug("sending message");
    messageChannels.testRequest().send(requestMessage);
  }
}

и application.yaml:

spring:
  application.name: avro-producer
  kafka:
    bootstrap-servers: localhost:9092
    consumer.group-id: avro-producer
  cloud:
    stream:
      schema-registry-client.endpoint: http://localhost:8071
      schema.avro.dynamic-schema-generation-enabled: true
      kafka:
        binder:
          brokers: ${spring.kafka.bootstrap-servers}
      bindings:
        test-request:
          destination: test-request
          contentType: application/*+avro

Потом у меня есть потребитель:

@Slf4j
@Component
public class TopicListener {

    @KafkaListener(topics = {"test-request"})
    public void listenForMessage(ConsumerRecord<String, Request> consumerRecord) {
        log.info("listenForMessage. got a message: {}", consumerRecord);
        consumerRecord.headers().forEach(header -> log.info("header. key: {}, value: {}", header.key(), asString(header.value())));
    }

    private String asString(byte[] byteArray) {
        return new String(byteArray, Charset.defaultCharset());
    }
}

И проект, который потребляет, имеет конфигурацию application.yaml:

spring:
  application.name: avro-consumer
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: avro-consumer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
#      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        schema.registry.url: http://localhost:8071

Когда потребитель получает сообщение, возникает исключение:

2019-01-30 20:01:39.900 ERROR 30876 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: null

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition test-request-0 at offset 43. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Я прошел через код десериализации до точки, в которой возникает это исключение.

public abstract class AbstractKafkaAvroDeserializer extends AbstractKafkaAvroSerDe {
....
private ByteBuffer getByteBuffer(byte[] payload) {
  ByteBuffer buffer = ByteBuffer.wrap(payload);
  if (buffer.get() != 0) {
    throw new SerializationException("Unknown magic byte!");
  } else {
    return buffer;
  }
}

Это происходит потому, что десериализатор проверяет байтовое содержимое сериализованного объекта (байтовый массив) и ожидает, что оно будет равно 0, однако это не так. Поэтому я сомневаюсь, что MessageConverter spring-cloud-stream, сериализовавший объект, совместим с объектом io.confluent, который я использую для десериализации объекта. И если они несовместимы, что мне делать?

спасибо за любую помощь.


person robjwilkins    schedule 30.01.2019    source источник
comment
Не редактируйте трассировку стека; показать все это. Cause by: сами по себе бесполезны.   -  person Gary Russell    schedule 30.01.2019
comment
Если в какой-то момент вы добавили в тему строковое (или не avro) значение, KafkaAvroDeserializer не сможет его прочитать ...   -  person OneCricketeer    schedule 31.01.2019
comment
@GaryRussell Я не редактировал трассировку стека - это все, что отображается на консоли. Также добавили дополнительное описание проблемы   -  person robjwilkins    schedule 31.01.2019
comment
@ cricket_007 - возможно, я опубликовал сообщение со строковой полезной нагрузкой, однако я сбросил смещения тем на самые последние, чтобы гарантировать, что старые сообщения не будут приняты.   -  person robjwilkins    schedule 31.01.2019
comment
Что ж, все, что я могу сказать, это неправильная трассировка стека; Помимо записей Caused by:, обычная трассировка стека содержит информацию о стеке вызовов (классы / методы / номера строк). Вот почему это называется трассировкой стека. Если вы не редактировали его, возможно, ваша подсистема ведения журнала настроена на подавление этой важной информации, что было бы действительно очень странно.   -  person Gary Russell    schedule 31.01.2019
comment
Все, что я действительно могу сказать, основано на моем опыте работы с этой ошибкой. Если вы перешли на последнее смещение, то потребитель ничего не прочитает, так как новых сообщений нет. Если вы снова запустили производитель, отправивший данные, и затем у потребителя такая же ошибка - проблема останется; сообщения не были сериализованы с помощью сериализаторов Confluent ... Я не знаю, что это за аннотация @Data, но она не используется в примерах Confluent github.com/confluentinc/examples/tree/5.1.0-post/clients/avro   -  person OneCricketeer    schedule 01.02.2019
comment
@Data - это аннотация Lombok - она ​​просто автоматически создает код геттера / сеттера   -  person robjwilkins    schedule 04.02.2019


Ответы (4)


Суть этой проблемы в том, что производитель использует spring-cloud-stream для отправки сообщений в Kafka, а потребитель использует spring-kaka. Причины этого:

  • Существующая система уже хорошо зарекомендовала себя и использует поток весенних облаков.
  • Новому потребителю необходимо прослушивать несколько тем, используя один и тот же метод, привязку только к списку имен тем в формате CSV.
  • Существует требование использовать коллекцию сообщений сразу, а не по отдельности, чтобы их содержимое можно было записать в базу данных массово.

Spring-cloud-stream не позволяет потребителю привязать слушателя к нескольким темам, и нет возможности использовать сразу набор сообщений (если я не ошибаюсь).

Я нашел решение, которое не требует каких-либо изменений в коде производителя, который использует spring-cloud-stream для публикации сообщений в Kafka. Spring-cloud-stream использует MessageConverter для управления сериализацией и десериализацией. В AbstractAvroMessageConverter есть методы: convertFromInternal и convertToInternal, которые обрабатывают преобразование в / из байтового массива. Мое решение состояло в том, чтобы расширить этот код (создать класс, расширяющий AvroSchemaRegistryClientMessageConverter), чтобы я мог повторно использовать большую часть функций spring-cloud-stream, но с интерфейсом, к которому можно получить доступ из моей spring-kafka KafkaListener. Затем я изменил свой TopicListener, чтобы использовать этот класс для преобразования:

Конвертер:

@Component
@Slf4j
public class AvroKafkaMessageConverter extends AvroSchemaRegistryClientMessageConverter {

  public AvroKafkaMessageConverter(SchemaRegistryClient schemaRegistryClient) {
    super(schemaRegistryClient, new NoOpCacheManager());
  }

  public <T> T convertFromInternal(ConsumerRecord<?, ?> consumerRecord, Class<T> targetClass,
      Object conversionHint) {
    T result;
    try {
      byte[] payload = (byte[]) consumerRecord.value();

      Map<String, String> headers = new HashMap<>();
      consumerRecord.headers().forEach(header -> headers.put(header.key(), asString(header.value())));

      MimeType mimeType = messageMimeType(conversionHint, headers);
      if (mimeType == null) {
        return null;
      }

      Schema writerSchema = resolveWriterSchemaForDeserialization(mimeType);
      Schema readerSchema = resolveReaderSchemaForDeserialization(targetClass);

      @SuppressWarnings("unchecked")
      DatumReader<Object> reader = getDatumReader((Class<Object>) targetClass, readerSchema, writerSchema);
      Decoder decoder = DecoderFactory.get().binaryDecoder(payload, null);
      result = (T) reader.read(null, decoder);
    }
    catch (IOException e) {
      throw new RuntimeException("Failed to read payload", e);
    }
    return result;
  }

  private MimeType messageMimeType(Object conversionHint, Map<String, String> headers) {
    MimeType mimeType;
    try {
      String contentType = headers.get(MessageHeaders.CONTENT_TYPE);
      log.debug("contentType: {}", contentType);
      mimeType = MimeType.valueOf(contentType);
    } catch (InvalidMimeTypeException e) {
      log.error("Exception getting object MimeType from contentType header", e);
      if (conversionHint instanceof MimeType) {
        mimeType = (MimeType) conversionHint;
      }
      else {
        return null;
      }
    }
    return mimeType;
  }

  private String asString(byte[] byteArray) {
    String theString = new String(byteArray, Charset.defaultCharset());
    return theString.replace("\"", "");
  }
}

Измененный TopicListener:

@Slf4j
@Component
@RequiredArgsConstructor
public class TopicListener {

  private final AvroKafkaMessageConverter messageConverter;

  @KafkaListener(topics = {"test-request"})
  public void listenForMessage(ConsumerRecord<?, ?> consumerRecord) {
    log.info("listenForMessage. got a message: {}", consumerRecord);
    Request request = messageConverter.convertFromInternal(
        consumerRecord, Request.class, MimeType.valueOf("application/vnd.*+avr"));
    log.info("request message: {}", request.getMessage());
  }
}

Это решение использует только одно сообщение за раз, но его можно легко изменить для обработки пакетов сообщений.

Полное решение находится здесь: https://github.com/robjwilkins/avro-example/tree/develop

person robjwilkins    schedule 04.02.2019

Вы должны явно определить десериализатор, создав DefaultKafkaConsumerFactory и свой TopicListener bean в конфигурации, примерно так:

@Configuration
@EnableKafka
public class TopicListenerConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Value(("${spring.kafka.consumer.group-id}"))
private String groupId;


@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.wilkins.avro.consumer");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    return props;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    return factory;
}

@Bean
public TopicListener topicListener() {
    return new TopicListener();
}
}
person TheRocket    schedule 30.01.2019
comment
Примечание. Вопрос связан с использованием Avro, а не JsonDeserializer, который не может прочитать эти данные. - person OneCricketeer; 31.01.2019

Вместо этого вы можете настроить привязку для использования Kafka Serializer изначально.

Установите для свойства производителя useNativeEncoding значение true и настройте сериализатор с помощью свойств ...producer.configuration Kafka.

ИЗМЕНИТЬ

Пример:

spring:
  cloud:
    stream:
# Generic binding properties
      bindings:
        input:
          consumer:
            use-native-decoding: true
          destination: so54448732
          group: so54448732
        output:
          destination: so54448732
          producer:
            use-native-encoding: true
# Kafka-specific binding properties
      kafka:
        bindings:
          input:
            consumer:
              configuration:
                value.deserializer: com.example.FooDeserializer
          output:
            producer:
              configuration:
                value.serializer: com.example.FooSerializer
person Gary Russell    schedule 31.01.2019
comment
Я предлагаю вам открыть проблему здесь с вашими предложениями по улучшению. - person Gary Russell; 31.01.2019
comment
Было бы очень полезно, если бы вы могли привести пример свойств производителя / потребителя, которые мне нужно было бы установить? Спасибо за вашу помощь. - person robjwilkins; 31.01.2019
comment
Спасибо за ответ. В моем примере Spring-cloud-stream не используется в качестве потребителя, но именно здесь у меня возникла проблема с десериализацией? - person robjwilkins; 01.02.2019
comment
Это не имеет значения; Для полноты картины я продемонстрировал использование собственных сериализаторов / десериализаторов с обеих сторон. Ваш вопрос касается совместимости конвертера сообщений и десериализатора avro. Использование нативного кодирования избавляет от необходимости конвертировать сообщения; полезная нагрузка отправляется клиенту Kafka напрямую как ProducerRecord.value, а сериализация выполняется Kafka; Весна в этом не участвует. Если у вас все еще возникают проблемы при использовании нативного кодирования, то проблема в другом, возможно, в самих данных. - person Gary Russell; 01.02.2019

Спасибо, что спас мне день, используя nativeencoding и spring: cloud: stream:

Общие свойства привязки

  bindings:
    input:
      consumer:
        use-native-decoding: true
      destination: so54448732
      group: so54448732
    output:
      destination: so54448732
      producer:
        use-native-encoding: true

Кафка-специфические связывающие свойства

  kafka:
    bindings:
      input:
        consumer:
          configuration:
            value.deserializer: com.example.FooDeserializer
      output:
        producer:
          configuration:
            value.serializer: com.example.FooSerializer
person user11546680    schedule 17.04.2020