Данные в таблице ksql не являются постоянными

Мы используем конфлюентную платформу на ubuntu. У нас есть простые данные JSON, отправляемые через запрос cURL на сервер kafka-rest в теме kafka с именем «UE_Context».

Поток kafka с именем "UE_CONTEXT_STREAM" создается для этой темы с помощью следующей команды:

CREATE STREAM UE_Context_Stream (ue_key VARCHAR, ecgi VARCHAR) WITH (KAFKA_TOPIC='UE_Context', VALUE_FORMAT='JSON');

Таблица kafka с именем "UE_CONTEXT_TABLE" создается для этой темы с помощью следующей команды:

CREATE TABLE UE_Context_Table ( registertime BIGINT, ue_key VARCHAR, ecgi VARCHAR) WITH (KAFKA_TOPIC='UE_Context', KEY='ue_key', VALUE_FORMAT='JSON');

У меня есть две строки данных, которые накачиваются по теме с помощью следующих команд cURL:

curl -X POST -H "Accept: application/json" -H "Content-Type: application/vnd.kafka.json.v1+json" --data '{"records":[{"key": "0x1234", "value":{"ue_key": "0x1234", "ecgi" : "1234"}}]}' "http://localhost:8082/topics/UE_Context"  
curl -X POST -H "Accept: application/json" -H "Content-Type: application/vnd.kafka.json.v1+json" --data '{"records":[{"key": "0x1234", "value":{"ue_key": "0x4321", "ecgi" : "4321"}}]}' "http://localhost:8082/topics/UE_Context"      

У меня есть запрос выбора, ожидающий в таблице, как показано ниже:

запрос ksql

Этот запрос отображает информацию о таблице, когда в тему закачиваются данные JSON. Затем мы прекращаем закачку данных JSON в тему, завершаем запрос выбора и завершаем запрос выбора. Если выбор выполняется позже, ранее заполненная информация таблицы не отображается. Нет возможности сохранить эти данные? Соединители Kafka и использование БД могут быть вариантом. Но разве в kSQL нет временной памяти для хранения информации о таблицах?


person Swaru    schedule 06.12.2018    source источник


Ответы (2)


выбор выполняется позже, ранее заполненная информация таблицы не отображается.

По умолчанию оператор select использует последние смещения темы.

Если вы хотите увидеть предыдущие данные, вам нужно вернуть смещение потребителя в начало

SET 'auto.offset.reset'='earliest';

Также, как указано в документации (с акцентом)

Оператор SELECT сам по себе является непостоянным непрерывным запросом. Результат оператора SELECT не сохраняется в теме Kafka и печатается только в консоли KSQL. Не путайте постоянные запросы, созданные с помощью CREATE STREAM AS SELECT, с результатами потокового запроса, полученными с помощью оператора SELECT.

person OneCricketeer    schedule 07.12.2018

Как указано в ksql github README:

ksqlDB позволяет вам определять материализованные представления ваших потоков и таблиц. Материализованные представления определяются так называемым постоянным запросом. Эти запросы известны как постоянные, потому что они поддерживают свои постепенно обновляемые результаты с помощью таблицы.

Теперь о материализованном представлении гораздо больше информации, чем о постоянных запросах, поэтому просто прочтите:

Преимущество материализованного представления состоит в том, что оно оценивает запрос только по изменениям (дельта), а не по всей таблице. ...

В ksqlDB таблица может быть материализована в представление или нет. Если таблица создается непосредственно поверх темы Kafka, она не материализуется. Невозможно запросить нематериализованные таблицы, потому что они были бы крайне неэффективными. С другой стороны, если таблица получена из другой коллекции, ksqlDB материализует ее результаты, и вы можете делать запросы к ней.

person yuranos    schedule 19.01.2021