Как получить потоковый ответ от KSQL в Spring Kafka?

Как получить ответ по частям от сервера kafka KSQL в приложении загрузки java spring?

Когда я делаю вызов конечной точки /query, я просто получаю 1 строку, и соединение закрывается. Как я могу сохранить соединение и получать несколько строк?

Док говорит

Ответ передается обратно до тех пор, пока не будет достигнут LIMIT, указанный в операторе, или пока клиент не закроет соединение.

Как добиться этого в Java? Даже для KTable я получаю взамен только 1 строку.

https://docs.confluent.io/current/ksql/docs/developer-guide/api.html#run-a-query-and-stream-back-the-output


person Sergei Ledvanov    schedule 19.01.2019    source источник


Ответы (1)


Я смог обойти это следующим образом:

  • получить ответ в виде строки
  • построчно анализировать объекты JSON (KafkaQueryResponse - объект, представляющий 1 строку)

        ResponseEntity<String> result = template.exchange("/query",
            HttpMethod.POST,
            new HttpEntity<>(params, headers),
            String.class);
    
        List<KafkaQueryResponse> array = new ArrayList<>();
        JsonFactory jsonFactory = new JsonFactory();
        try(BufferedReader br = new BufferedReader(new StringReader(result.getBody()))) {
            Iterator<KafkaQueryResponse> value = objectMapper.readValues(jsonFactory.createParser(br), KafkaQueryResponse.class);
            value.forEachRemaining(e -> {
                if (e.getRow() != null) {
                    array.add(e);
                }
            });
        }
        array <----  this is the list of JSON objects
    

KafkaQueryResponse

    @Data
    @JsonIgnoreProperties(ignoreUnknown = true)
    public class KafkaQueryResponse {
        private KafkaQueryRow row;
        private String finalMessage;
        private String errorMessage;

        @Data
        @JsonIgnoreProperties(ignoreUnknown = true)
        public static class KafkaQueryRow {
            private List<Object> columns;
        }
    }

Это решение не позволяет читать потоковый ответ по частям. Он ожидает, пока весь ответ поступит к клиенту, затем закрывает соединение и затем анализирует все объекты json.

person Sergei Ledvanov    schedule 20.01.2019