KafkaStreamsStateStore не работает, если значением хранилища является Avro SpecificRecord

У меня есть приложение Spring Cloud Kafka Streams, которое использует StateStore в API процессора при использовании преобразователя для выполнения дедупликации.

Пары "ключ-значение" государственного хранилища бывают следующих типов: <String, TransferEmitted>.

При запуске приложения в момент помещения значения в хранилище состояний (dedupStore.put(key, value)) я получаю следующее исключение:

Вызвано: java.lang.ClassCastException: com.codependent.outboxpattern.account.TransferEmitted не может быть преобразован в java.lang.String

Это связано с тем, что значение serde по умолчанию для KafkaStreamsStateStore равно StringSerde.

Таким образом, я добавил параметр valueSerde в аннотацию KafkaStreamsStateStore, указав значение для SpecificAvroSerde:

    @KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE,
            valueSerde = "io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde")

Теперь я получаю исключение NullPointerException в AbstractKafkaAvroSerializer.serializeImpl, потому что в id = this.schemaRegistry.getId(subject, schema); schemaRegistry имеет значение null:

Вызвано: org.apache.kafka.common.errors.SerializationException: Ошибка сериализации сообщения Avro Вызвано: java.lang.NullPointerException в io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl (AbstractKafkaAvroSerializer.serializeImpl (AbstractKafkaAvroSerializer) .kafka.serializers.KafkaAvroSerializer.serialize (KafkaAvroSerializer.java:53) в io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize (SpecificAvroSerializer.java:65) на io.confluent.io.confluent.io.confluent. .SpecificAvroSerializer.serialize (SpecificAvroSerializer.java:38).

Несмотря на то, что реестр схемы настроен как компонент Spring ...

@Configuration
class SchemaRegistryConfiguration {

    @Bean
    fun schemaRegistryClient(@Value("\${spring.cloud.stream.schema-registry-client.endpoint}") endpoint: String): SchemaRegistryClient {
        val client = ConfluentSchemaRegistryClient()
        client.setEndpoint(endpoint)
        return client
    }

}

... когда Kafka устанавливает SpecificAvroSerde, он использует конструктор no-params, поэтому он не инициализирует клиент реестра схемы:

public class SpecificAvroSerde<T extends SpecificRecord> implements Serde<T> {
    private final Serde<T> inner;

    public SpecificAvroSerde() {
        this.inner = Serdes.serdeFrom(new SpecificAvroSerializer(), new SpecificAvroDeserializer());
    }

    public SpecificAvroSerde(SchemaRegistryClient client) {
        if (client == null) {
            throw new IllegalArgumentException("schema registry client must not be null");
        } else {
            this.inner = Serdes.serdeFrom(new SpecificAvroSerializer(client), new SpecificAvroDeserializer(client));
        }
    }

Как мне настроить это приложение так, чтобы оно позволяло сериализовать StateStore<String, TransferEmitted>?

ВЫДЕРЖКИ ИЗ ПРОЕКТА (источник доступен по адресу https://github.com/codependent/kafka-outbox-pattern)

KStream

const val DEDUP_STORE = "dedup-store"

@EnableBinding(KafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration(private val fraudDetectionService: FraudDetectionService) {

    @KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE)
    @StreamListener
    @SendTo("output")
    fun process(@Input("input") input: KStream<String, TransferEmitted>): KStream<String, TransferEmitted> {
        return input
                .transform(TransformerSupplier { DeduplicationTransformer() }, DEDUP_STORE)
                .filter { _, value -> fraudDetectionService.isFraudulent(value) }

    }

}

Трансформатор

@Suppress("UNCHECKED_CAST")
class DeduplicationTransformer : Transformer<String, TransferEmitted, KeyValue<String, TransferEmitted>> {

    private lateinit var dedupStore: KeyValueStore<String, TransferEmitted>
    private lateinit var context: ProcessorContext

    override fun init(context: ProcessorContext) {
        this.context = context
        dedupStore = context.getStateStore(DEDUP_STORE) as KeyValueStore<String, TransferEmitted>
    }

    override fun transform(key: String, value: TransferEmitted): KeyValue<String, TransferEmitted>? {
        return if (isDuplicate(key)) {
            null
        } else {
            dedupStore.put(key, value)
            KeyValue(key, value)
        }
    }

    private fun isDuplicate(key: String) = dedupStore[key] != null

    override fun close() {
    }
}

application.yml

spring:
  application:
    name: fraud-service
  cloud:
    stream:
      schema-registry-client:
        endpoint: http://localhost:8081
      kafka:
        streams:
          binder:
            configuration:
              application:
                id: fraud-service
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              schema:
                registry:
                  url: http://localhost:8081
      bindings:
        input:
          destination: transfer
          contentType: application/*+avro
        output:
          destination: fraudulent-transfer
          contentType: application/*+avro

server:
  port: 8086

logging:
  level:
    org.springframework.cloud.stream: debug


comment
Можете ли вы отладить приложение и посмотреть, возникло ли это исключение из-за какого-либо пробела в привязке? Если да, поднимите проблему или внесите исправление. Не стесняйтесь пинговать gitter, если хотите пообщаться.   -  person sobychacko    schedule 17.06.2019


Ответы (1)


Я столкнулся с той же проблемой и забыл, что необходимо передать schema.registry.url, чтобы убедиться, что вы можете хранить записи Avro в своем хранилище состояний.

Например:

    @Bean
    public StoreBuilder eventStore(Map<String, String> schemaConfig) {
        final Duration windowSize = Duration.ofMinutes(DUPLICATION_WINDOW_DURATION);

        // retention period must be at least window size -- for this use case, we don't need a longer retention period
        // and thus just use the window size as retention time
        final Duration retentionPeriod = windowSize;

        // We have to specify schema.registry.url here, otherwise schemaRegistry value will end up null
        KafkaAvroSerializer serializer = new KafkaAvroSerializer();
        KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer();
        serializer.configure(schemaConfig, true);
        deserializer.configure(schemaConfig, true);

        final StoreBuilder<WindowStore<Object, Long>> dedupStoreBuilder = Stores.windowStoreBuilder(
                Stores.persistentWindowStore(STORE_NAME,
                        retentionPeriod,
                        windowSize,
                        false
                ),
                Serdes.serdeFrom(serializer, deserializer),
                // timestamp value is long
                Serdes.Long());
        return dedupStoreBuilder;
    }

    @Bean
    public Map<String, String> schemaConfig(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String url) {
        return Collections.singletonMap("schema.registry.url", "http://localhost:8081");
    }

Вот файл application.yml:

spring:
  cloud:
    stream:
      schemaRegistryClient:
        endpoint: http://localhost:8081

После того, как я сделал это, я смог правильно настроить это хранилище и больше не видел NullPointerException.

person Vinayak Ponangi    schedule 19.04.2021