Spring Cloud Stream генерирует значение в виде строки, содержащей JSON, а не только JSON

В приложении потоковой обработки с использованием Spring Cloud Stream я беру входной поток (с целым числом) и вызываю для него selectKey, чтобы создать новую тему с теми же значениями, но с другим ключом (строкой). В теме ввода есть записи в правильном формате JSON, например:

"key": {
  "id": 1
},
"value": {
  "id": 1,
  "public_id": "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a", ...

Проблема в том, что тема, созданная приложением потоковой обработки, имеет value как строку, содержащую JSON, а не как правильный JSON, то есть:

"key": "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a",
"value": "{\"id\":1,\"publicId\":\"4273b60f-6fe6-40be-8602-d0b3ed2ecf2a\"}"

Код выглядит следующим образом:

@StreamListener
@SendTo("output")
fun process(@Input("input") stream: KStream<Int, MyObj>): KStream<String, MyObj> =
         stream.selectKey { _, value -> value.publicId }

Вышеупомянутая функция потребляет входной поток и генерирует выходной поток (отправляется на output). Этот выходной поток имеет те же значения, что и входной поток, но просто другой ключ. (В этом случае ключ берется из свойства значения publicId.)

application.yml выглядит следующим образом:

spring.cloud.stream:
  bindings:
    input:
      destination: input-topic
    output:
      destination: output-output
  kafka:
    streams:
      binder:
        application-id: test-app-id-1
      bindings:
        input:
          consumer:
            keySerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
        output:
          producer:
            keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde

Что-то мне не хватает? Действительно ли это проблема, или JSON можно хранить в виде строки в сообщениях, создаваемых Spring Cloud Stream?

Другие вещи, которые я пробовал, но которые не помогли:

  • Использование собственного декодирования / кодирования
  • Установка spring.cloud.stream.bindings.output.content-type на application/json
  • Использование map вместо selectKey

person Yoni Gibbs    schedule 24.10.2019    source источник


Ответы (1)


Это означает, что вы отправляете publicId: "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a" как строку вместо POJO.

Если это то, что вы отправляете, вам следует использовать StringSerde, а не JsonSerde.

ИЗМЕНИТЬ

Я только что протестировал его с помощью приложения Java, и он работает, как ожидалось ...

@SpringBootApplication
@EnableBinding(KafkaStreamsProcessor.class)
public class So58538297Application {

    public static void main(String[] args) {
        SpringApplication.run(So58538297Application.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public KStream<String, Foo> process(@Input(Processor.INPUT) KStream<String, Foo> stream) {
        return stream.selectKey((key, value) -> value.getBar());
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        ObjectMapper mapper = new ObjectMapper();
        return args -> {
            template.send(Processor.INPUT, mapper.writeValueAsString(new Foo("baz")));
        };
    }

    @KafkaListener(id = "outputGroup", topics = Processor.OUTPUT)
    public void out(String in, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
        System.out.println("out:" + in + ", key:" + key);
    }

    @KafkaListener(id = "copyOfInput", topics = Processor.INPUT)
    public void in(String in) {
        System.out.println("in:" + in);
    }

    public static class Foo {

        private String bar;

        public Foo() {
            super();
        }

        public Foo(String bar) {
            this.bar = bar;
        }

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

    }

}

и

spring.application.name=so58538297

spring.kafka.consumer.auto-offset-reset=earliest

и

in:{"bar":"baz"}
out:{"bar":"baz"}, key:baz
person Gary Russell    schedule 24.10.2019
comment
Десериализация работает нормально. Проблема в сериализации. Я просматриваю данные, сериализованные в тему Kafka (с использованием центра консоли Kafka или пользовательского интерфейса Kafka Topics от Landoop), и объект сериализуется как строка. Строка, которая сериализуется как действительный JSON, но это строка со всем, что в ней экранировано, а не необработанный JSON, как в исходной теме (с данными, созданными, в этом примере, Debezium). Также в коде, который вы связали, он показывает stream.through с двумя параметрами serde: из того, что я вижу, through необходимо указать название темы. - person Yoni Gibbs; 24.10.2019
comment
Извините - пропустил; StringSerde предназначен для ключа; то, что вы наблюдаете, произойдет, если вы публикуете String вместо POJO. - person Gary Russell; 24.10.2019
comment
Итак, как мне заставить его опубликовать как JSON-представление POJO вместо строки, содержащей этот JSON? Я тоже пробовал использовать собственное кодирование / декодирование, и там тоже не повезло. - person Yoni Gibbs; 24.10.2019
comment
Используйте StringSerde вместо JsonSerde. - person Gary Russell; 24.10.2019
comment
Но я хочу именно JSON. Это не просто одно поле (publicId): это весь POJO, который я хочу сохранить как JSON. Чтобы уточнить: реальный объект в сообщении я не хочу менять: это просто ключ. - person Yoni Gibbs; 24.10.2019
comment
Ваш вопрос сбивает с толку; вы показываете потребителя потока, который сопоставляет его с value -> value.publicId - ваша проблема связана с тем, что создает запись; вам нужно это показать. - person Gary Russell; 24.10.2019
comment
Я показал это: функция process потребляет и производит. См. Аннотацию @SendTo("output"). Также он не сопоставляет значение с publicId: он использует selectKey, поэтому устанавливает ключ на publicId. Стоимость остается неизменной. Я обновил свой вопрос, чтобы попытаться прояснить это: извиняюсь, если это сбивает с толку. - person Yoni Gibbs; 24.10.2019
comment
Извините, теперь я вижу javadoc для selectKey Я понимаю, что должен делать ваш процессор. Вы просто отправляете то же значение, но с новым ключом. Итак, как-то происходит двойное преобразование JSON. Я предлагаю вам запустить отладчик, чтобы понять, почему. - person Gary Russell; 24.10.2019
comment
Хммм - я только что протестировал его с приложением Java и (Boot 2.2, Hoxton.M3), и он работал, как ожидалось (см. Мою правку). Какие версии вы используете? - person Gary Russell; 24.10.2019
comment
Спасибо. Я провел еще несколько исследований, и на самом деле похоже, что данные, отображаемые в виде строки (а не JSON), происходят только в пользовательском интерфейсе Landoop Kafka Topics. Я обновился до последних версий и ничего не изменилось. Но я начинаю подозревать, что это больше связано с пользовательским интерфейсом Landoop. Консоль Kafka Consumer показывает данные нормально, и запись других потоковых процессоров для приема данных, созданных исходным потоковым процессором, похоже, тоже работает нормально. Так что вроде все работает. Спасибо за помощь. - person Yoni Gibbs; 24.10.2019