Spring Cloud Stream Kafka ›получение сообщений Avro из прокси-сервера Confluent REST

У меня такой сценарий:

  • Производитель отправляет сообщения в кодировке Avro в тему Kafka через прокси-сервер Confluent REST (который регистрирует схему в реестре схем Confluent), как описано в http://docs.confluent.io/3.0.0/kafka-rest/docs/intro.html#produce-and-consumer-avro-messages
  • Сообщение с включенным Spring Cloud Stream прослушивает тему на предмет новых сообщений

Мое приложение выглядит так:

@SpringBootApplication
@EnableBinding(Sink.class)
public class MyApplication {
  private static Logger log = LoggerFactory.getLogger(MyApplication.class);

  public static void main(String[] args) {
    SpringApplication.run(MyApplication.class, args);
  }

  @StreamListener(Sink.INPUT)
  public void myMessageSink(MyMessage message) {
    log.info("Received new message: {}", message);
  }
}

В то время как MyMessage - это класс, созданный Avro из схемы Avro.

Мой application.properties выглядит так:

spring.cloud.stream.bindings.input.destination=myTopic
spring.cloud.stream.bindings.input.group=${spring.application.name}
spring.cloud.stream.bindings.input.contentType=application/*+avro

Моя проблема теперь в том, что каждый раз при получении нового сообщения выдается следующее исключение:

org.springframework.messaging.MessagingException: Exception thrown while invoking MyApplication#myMessageSink[1 args]; nested exception is org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -27
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor$StreamListenerMessageHandler.handleRequestMessage(StreamListenerAnnotationBeanPostProcessor.java:316) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    ...
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -27
    at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:430) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readMapKey(GenericDatumReader.java:335) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:321) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144) ~[avro-1.8.1.jar:1.8.1]
    at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertFromInternal(AbstractAvroMessageConverter.java:91) ~[spring-cloud-stream-schema-1.1.0.RELEASE.jar:1.1.0.RELEASE]
    at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:175) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
    at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:67) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:117) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:138) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor$StreamListenerMessageHandler.handleRequestMessage(StreamListenerAnnotationBeanPostProcessor.java:307) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]
    ... 35 common frames omitted

Насколько я понимаю, проблема в том, что стек Confluent включает идентификатор схемы сообщения как часть полезной нагрузки сообщения, и ожидается, что клиенты начнут читать фактическое сообщение Avro после идентификатора схемы. Кажется, мне нужно настроить привязку Kafka для использования KafkaAvroDeserializer Confluent, но я не могу понять, как этого добиться.

(Я могу отлично извлекать сообщения с помощью консольного потребителя Confluent avro, поэтому, похоже, это не проблема с кодировкой Avro)

Я также попытался поиграть с аннотацией @EnableSchemaRegistry и настроить bean-компонент ConfluentSchemaRegistryClient, но мне кажется, что это контролирует только то, где схемы хранятся / извлекаются, но не фактическая десериализация.

Это вообще должно как-то работать?


person neptoon    schedule 09.10.2016    source источник


Ответы (2)


Работает ли это при установке свойства per-binding spring.cloud.stream.kafka.bindings.input.consumer.configuration.value.deserializer на Confluent's KafkaAvroDeserializer class name?

person Ilayaperumal Gopinathan    schedule 09.10.2016
comment
Похоже, что конечная точка потребителя KafkaMessageChannelBinder всегда использует ByteArrayDeserializer для сериализаторов "ключ-значение". Это может быть результатом того, что производитель KafkaMessageChannelBinder по умолчанию использует ByteArraySerializer. В случае производителей, не использующих Spring Cloud Stream, потребитель должен иметь возможность переопределить требуемый десериализатор, используя указанное выше свойство. - person Ilayaperumal Gopinathan; 09.10.2016
comment
К сожалению, нет, я уже пробовал это. Из того, что я видел в исходный код десериализатор жестко запрограммирован как ByteArrayDeserializer по умолчанию. - person neptoon; 09.10.2016
comment
да, создайте проблему здесь: github.com/spring- облако / весна-облако-поток-связыватель-кафка / вопросы. Мы можем отследить это оттуда. Спасибо! - person Ilayaperumal Gopinathan; 09.10.2016
comment
Созданная проблема: github.com/spring-cloud/spring -cloud-stream-binder-kafka / issues / - person neptoon; 09.10.2016

Что-то вроде ответа на свой вопрос. На данный момент я реализовал MessageConverter, который просто удаляет первые 4 байта любого сообщения перед их передачей в декодер Avro. Код в основном взят из AbstractAvroMessageConverter spring-cloud-stream:

public class ConfluentAvroSchemaMessageConverter extends AvroSchemaMessageConverter {

public ConfluentAvroSchemaMessageConverter() {
    super(new MimeType("application", "avro+confluent"));
}

@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
    Object result = null;
    try {
        byte[] payload = (byte[]) message.getPayload();

        // byte array to contain the message without the confluent header (first 4 bytes)
        byte[] payloadWithoutConfluentHeader = new byte[payload.length - 4];
        ByteBuffer buf = ByteBuffer.wrap(payload);
        MimeType mimeType = getContentTypeResolver().resolve(message.getHeaders());
        if (mimeType == null) {
            if (conversionHint instanceof MimeType) {
                mimeType = (MimeType) conversionHint;
            }
            else {
                return null;
            }
        }

        // read first 4 bytes and copy the rest to the new byte array
        // see https://groups.google.com/forum/#!topic/confluent-platform/rI1WNPp8DJU
        buf.getInt();
        buf.get(payloadWithoutConfluentHeader);

        Schema writerSchema = resolveWriterSchemaForDeserialization(mimeType);
        Schema readerSchema = resolveReaderSchemaForDeserialization(targetClass);
        DatumReader<Object> reader = getDatumReader((Class<Object>) targetClass, readerSchema, writerSchema);
        Decoder decoder = DecoderFactory.get().binaryDecoder(payloadWithoutConfluentHeader, null);
        result = reader.read(null, decoder);
    }
    catch (IOException e) {
            throw new MessageConversionException(message, "Failed to read payload", e);
    }
    return result;

}

Затем я установил тип контента для входящей темы Kafka в application / avro + confluent через application.properties.

Это, по крайней мере, позволяет мне получать сообщения, закодированные с помощью стека Confluent, но, конечно, это никак не взаимодействует с реестром схемы.

person neptoon    schedule 09.10.2016