Несколько типов событий в одной теме с Kafka, Quarkus и Avro

Я пытаюсь прочитать событие из темы с помощью Quarkus. Тема может содержать разные типы событий. Все события используют формат AVRO, поэтому у меня есть реестр схем, где я могу прочитать выпущенную схему события. Я использую avro-maven-plugin для компиляции схемы в класс Java.

Предположим, у нас есть два типа событий со следующими схемами:

Событие1

{
   "field1": "string"
}

Событие2

{
   "field2": "string"
}

В моем приложении меня интересует только один из них.

 public class Consumer {

    @Incoming("test-in")
    public CompletionStage<Void> read(
        IncomingKafkaRecord<String, Event1> data) {
        System.out.println(data.getKey());
        System.out.println(data.getPayload());
        return data.ack();
    }
}

Этот код печатает все события, а не только события типа Event1, как я ожидал.

Когда я пытаюсь получить данные о событии data.getField1(), я получаю CastException.

java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class com.test.Event1 (org.apache.avro.generic.GenericData$Record is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @1aa7ecca; com.test.Event1 is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @1144a55a)

Есть способ прочитать только некоторые типы событий из темы с несколькими типами событий. Usign AVRO?


person Stefano Castoldi    schedule 14.10.2020    source источник


Ответы (1)


Возможное решение (немного словесное) - использовать SpecificRecord в качестве типа полезной нагрузки и установить specific.avro.reader=true в файле конфигурации приложения для входящих событий.

 public class Consumer {

    @Incoming("test-in")
    public CompletionStage<Void> read(
        IncomingKafkaRecord<String, SpecificRecord> data) {
            String schemaFullName = data.getPayload().getSchema().getFullName();
            if (schemaFullName.equals(Event1.class.getName())) {
                System.out.println(((Event1) data.getPayload()).getField1());
            }
        return data.ack();
    }
}

После выбора события вы можете преобразовать SpecificRecord в правильный скомпилированный класс событий Avro.

person Stefano Castoldi    schedule 14.10.2020