У меня проблема с десериализацией сообщений из тем Kafka. Сообщения были сериализованы с помощью spring-cloud-stream и Apache Avro. Я читаю их с помощью Spring Kafka и пытаюсь десериализовать их. Если я использую spring -cloud как для создания, так и для потребления сообщений, тогда я могу десериализовать сообщения нормально. Проблема в том, что я использую их с помощью Spring Kafka, а затем пытаюсь десериализовать.
Я использую реестр схем (как реестр схем весенней загрузки для разработки, так и схему Confluent в производственной среде), но проблемы с десериализацией, похоже, возникают до того, как событие вызывает реестр схем.
Трудно опубликовать весь соответствующий код по этому вопросу, поэтому я разместил его в репозитории в git-хабе: https://github.com/robjwilkins/avro-example
Объект, который я отправляю по теме, - это просто pojo:
@Data
public class Request {
private String message;
}
Код, который генерирует сообщения в Kafka, выглядит так:
@EnableBinding(MessageChannels.class)
@Slf4j
@RequiredArgsConstructor
@RestController
public class ProducerController {
private final MessageChannels messageChannels;
@GetMapping("/produce")
public void produceMessage() {
Request request = new Request();
request.setMessage("hello world");
Message<Request> requestMessage = MessageBuilder.withPayload(request).build();
log.debug("sending message");
messageChannels.testRequest().send(requestMessage);
}
}
и application.yaml:
spring:
application.name: avro-producer
kafka:
bootstrap-servers: localhost:9092
consumer.group-id: avro-producer
cloud:
stream:
schema-registry-client.endpoint: http://localhost:8071
schema.avro.dynamic-schema-generation-enabled: true
kafka:
binder:
brokers: ${spring.kafka.bootstrap-servers}
bindings:
test-request:
destination: test-request
contentType: application/*+avro
Потом у меня есть потребитель:
@Slf4j
@Component
public class TopicListener {
@KafkaListener(topics = {"test-request"})
public void listenForMessage(ConsumerRecord<String, Request> consumerRecord) {
log.info("listenForMessage. got a message: {}", consumerRecord);
consumerRecord.headers().forEach(header -> log.info("header. key: {}, value: {}", header.key(), asString(header.value())));
}
private String asString(byte[] byteArray) {
return new String(byteArray, Charset.defaultCharset());
}
}
И проект, который потребляет, имеет конфигурацию application.yaml:
spring:
application.name: avro-consumer
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: avro-consumer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
schema.registry.url: http://localhost:8071
Когда потребитель получает сообщение, возникает исключение:
2019-01-30 20:01:39.900 ERROR 30876 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition test-request-0 at offset 43. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Я прошел через код десериализации до точки, в которой возникает это исключение.
public abstract class AbstractKafkaAvroDeserializer extends AbstractKafkaAvroSerDe {
....
private ByteBuffer getByteBuffer(byte[] payload) {
ByteBuffer buffer = ByteBuffer.wrap(payload);
if (buffer.get() != 0) {
throw new SerializationException("Unknown magic byte!");
} else {
return buffer;
}
}
Это происходит потому, что десериализатор проверяет байтовое содержимое сериализованного объекта (байтовый массив) и ожидает, что оно будет равно 0, однако это не так. Поэтому я сомневаюсь, что MessageConverter spring-cloud-stream, сериализовавший объект, совместим с объектом io.confluent, который я использую для десериализации объекта. И если они несовместимы, что мне делать?
спасибо за любую помощь.
Cause by:
сами по себе бесполезны. - person Gary Russell   schedule 30.01.2019KafkaAvroDeserializer
не сможет его прочитать ... - person OneCricketeer   schedule 31.01.2019Caused by:
, обычная трассировка стека содержит информацию о стеке вызовов (классы / методы / номера строк). Вот почему это называется трассировкой стека. Если вы не редактировали его, возможно, ваша подсистема ведения журнала настроена на подавление этой важной информации, что было бы действительно очень странно. - person Gary Russell   schedule 31.01.2019@Data
, но она не используется в примерах Confluent github.com/confluentinc/examples/tree/5.1.0-post/clients/avro - person OneCricketeer   schedule 01.02.2019