В приложении потоковой обработки с использованием Spring Cloud Stream я беру входной поток (с целым числом) и вызываю для него selectKey
, чтобы создать новую тему с теми же значениями, но с другим ключом (строкой). В теме ввода есть записи в правильном формате JSON, например:
"key": {
"id": 1
},
"value": {
"id": 1,
"public_id": "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a", ...
Проблема в том, что тема, созданная приложением потоковой обработки, имеет value
как строку, содержащую JSON, а не как правильный JSON, то есть:
"key": "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a",
"value": "{\"id\":1,\"publicId\":\"4273b60f-6fe6-40be-8602-d0b3ed2ecf2a\"}"
Код выглядит следующим образом:
@StreamListener
@SendTo("output")
fun process(@Input("input") stream: KStream<Int, MyObj>): KStream<String, MyObj> =
stream.selectKey { _, value -> value.publicId }
Вышеупомянутая функция потребляет входной поток и генерирует выходной поток (отправляется на output
). Этот выходной поток имеет те же значения, что и входной поток, но просто другой ключ. (В этом случае ключ берется из свойства значения publicId
.)
application.yml
выглядит следующим образом:
spring.cloud.stream:
bindings:
input:
destination: input-topic
output:
destination: output-output
kafka:
streams:
binder:
application-id: test-app-id-1
bindings:
input:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
output:
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
Что-то мне не хватает? Действительно ли это проблема, или JSON можно хранить в виде строки в сообщениях, создаваемых Spring Cloud Stream?
Другие вещи, которые я пробовал, но которые не помогли:
- Использование собственного декодирования / кодирования
- Установка
spring.cloud.stream.bindings.output.content-type
наapplication/json
- Использование
map
вместоselectKey