У меня есть приложение Spring Cloud Kafka Streams, которое использует StateStore в API процессора при использовании преобразователя для выполнения дедупликации.
Пары "ключ-значение" государственного хранилища бывают следующих типов: <String, TransferEmitted>
.
При запуске приложения в момент помещения значения в хранилище состояний (dedupStore.put(key, value)
) я получаю следующее исключение:
Вызвано: java.lang.ClassCastException: com.codependent.outboxpattern.account.TransferEmitted не может быть преобразован в java.lang.String
Это связано с тем, что значение serde по умолчанию для KafkaStreamsStateStore
равно StringSerde
.
Таким образом, я добавил параметр valueSerde в аннотацию KafkaStreamsStateStore
, указав значение для SpecificAvroSerde
:
@KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE,
valueSerde = "io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde")
Теперь я получаю исключение NullPointerException в AbstractKafkaAvroSerializer.serializeImpl
, потому что в id = this.schemaRegistry.getId(subject, schema);
schemaRegistry имеет значение null:
Вызвано: org.apache.kafka.common.errors.SerializationException: Ошибка сериализации сообщения Avro Вызвано: java.lang.NullPointerException в io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl (AbstractKafkaAvroSerializer.serializeImpl (AbstractKafkaAvroSerializer) .kafka.serializers.KafkaAvroSerializer.serialize (KafkaAvroSerializer.java:53) в io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize (SpecificAvroSerializer.java:65) на io.confluent.io.confluent.io.confluent. .SpecificAvroSerializer.serialize (SpecificAvroSerializer.java:38).
Несмотря на то, что реестр схемы настроен как компонент Spring ...
@Configuration
class SchemaRegistryConfiguration {
@Bean
fun schemaRegistryClient(@Value("\${spring.cloud.stream.schema-registry-client.endpoint}") endpoint: String): SchemaRegistryClient {
val client = ConfluentSchemaRegistryClient()
client.setEndpoint(endpoint)
return client
}
}
... когда Kafka устанавливает SpecificAvroSerde
, он использует конструктор no-params, поэтому он не инициализирует клиент реестра схемы:
public class SpecificAvroSerde<T extends SpecificRecord> implements Serde<T> {
private final Serde<T> inner;
public SpecificAvroSerde() {
this.inner = Serdes.serdeFrom(new SpecificAvroSerializer(), new SpecificAvroDeserializer());
}
public SpecificAvroSerde(SchemaRegistryClient client) {
if (client == null) {
throw new IllegalArgumentException("schema registry client must not be null");
} else {
this.inner = Serdes.serdeFrom(new SpecificAvroSerializer(client), new SpecificAvroDeserializer(client));
}
}
Как мне настроить это приложение так, чтобы оно позволяло сериализовать StateStore<String, TransferEmitted>
?
ВЫДЕРЖКИ ИЗ ПРОЕКТА (источник доступен по адресу https://github.com/codependent/kafka-outbox-pattern)
KStream
const val DEDUP_STORE = "dedup-store"
@EnableBinding(KafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration(private val fraudDetectionService: FraudDetectionService) {
@KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE)
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<String, TransferEmitted>): KStream<String, TransferEmitted> {
return input
.transform(TransformerSupplier { DeduplicationTransformer() }, DEDUP_STORE)
.filter { _, value -> fraudDetectionService.isFraudulent(value) }
}
}
Трансформатор
@Suppress("UNCHECKED_CAST")
class DeduplicationTransformer : Transformer<String, TransferEmitted, KeyValue<String, TransferEmitted>> {
private lateinit var dedupStore: KeyValueStore<String, TransferEmitted>
private lateinit var context: ProcessorContext
override fun init(context: ProcessorContext) {
this.context = context
dedupStore = context.getStateStore(DEDUP_STORE) as KeyValueStore<String, TransferEmitted>
}
override fun transform(key: String, value: TransferEmitted): KeyValue<String, TransferEmitted>? {
return if (isDuplicate(key)) {
null
} else {
dedupStore.put(key, value)
KeyValue(key, value)
}
}
private fun isDuplicate(key: String) = dedupStore[key] != null
override fun close() {
}
}
application.yml
spring:
application:
name: fraud-service
cloud:
stream:
schema-registry-client:
endpoint: http://localhost:8081
kafka:
streams:
binder:
configuration:
application:
id: fraud-service
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
schema:
registry:
url: http://localhost:8081
bindings:
input:
destination: transfer
contentType: application/*+avro
output:
destination: fraudulent-transfer
contentType: application/*+avro
server:
port: 8086
logging:
level:
org.springframework.cloud.stream: debug