Зачем мне нужно создавать потребителя Kafka для подключения к реестру схем?

Предыдущее примечание: я новичок в Kafka.

Я пытаюсь получить все схемы из реестра схем, но не могу сделать это только с помощью клиента реестра схем. Это работает, только если до этого я создаю экземпляр KafkaConsumer.

Не понимаю почему. Вот код (с потребителем на месте).

ConsumerConfig - это просто класс со всеми необходимыми конфигурациями. Включая URL-адрес реестра схемы.

Consumer<String, String>  consumer = new KafkaConsumer<String, String>(ConsumerConfig.get());
CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(ConsumerConfig.getSchemaRegistryURL(), 30);
Collection<String> listOfSubjects = client.getAllSubjects();
consumer.close();

Без потребителя я получаю:

io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: нет содержимого для сопоставления из-за конца ввода

С потребителем все нормально работает. Я хотел бы, чтобы кто-то мог пролить свет на то, почему это происходит, если я не вижу причин, по которым мне нужно подключаться к фактическому кластеру Kafka через потребителя, чтобы получить доступ к реестру схемы, который находится на другой конечной точке.


person LeYAUable    schedule 02.01.2019    source источник
comment
Создание потребителя не даст вам всех схем, будет возможно получить только схемы для темы, которую вы назначили потребителю.   -  person OneCricketeer    schedule 03.01.2019
comment
Есть ли ошибки в журнале реестра схем? Получаете ли вы правильный ответ при вызове реестра схем с помощью curl - curl http://localhost:8081/subjects?   -  person belo    schedule 04.01.2019
comment
@ cricket_007 Я могу получить информацию о каждой схеме без дальнейшего кодирования, касающегося создания экземпляра потребителя.   -  person LeYAUable    schedule 06.01.2019
comment
@belo Да, я тестировал его на почтальоне, и он отлично работает   -  person LeYAUable    schedule 06.01.2019
comment
Простите, я не понимаю. Потребитель не общается с реестром. Десериализатор делает. Опять же, Потребитель не нужен для получения предметов или схем (как показано ниже). Кроме того, вы не указали, какую версию Клиентов или Kafka вы используете. Если вы думаете, что это действительно ошибка, не стесняйтесь размещать информацию о Schema Registry Github.   -  person OneCricketeer    schedule 07.01.2019
comment
Да, я также нахожу это странным именно по тем же причинам, которые вы только что указали ... в этом нет необходимости, но, как ни странно, без него он не работает. Я буду исследовать дальше.   -  person LeYAUable    schedule 07.01.2019


Ответы (1)


Вам вообще не нужно создавать экземпляр KafkaConsumer. Оба полностью независимы.

Если вы просто хотите получить все предметы и схему из SchemaRegistry, просто создайте экземпляр CachedSchemaRegistryClient и вызовите связанную операцию.

Вот рабочий пример:

 private final static Map<String, Schema> schemas = new ConcurrentHashMap<>();
 protected static SchemaRegistryClient schemaRegistryClient;

 public static void main(String[] args) {
       String registryUrl = "http://localhost:8081";
        try {
            schemaRegistryClient = new CachedSchemaRegistryClient(registryUrl, 30);
            System.out.println(schemaRegistryClient);
            Collection<String> subjects = schemaRegistryClient.getAllSubjects();
            System.out.println(subjects);
        } catch (Exception e){
            throw new RuntimeException(e);
        }
    }
person Nishu Tayal    schedule 02.01.2019
comment
Что ж, это также было моим пониманием и тем, что я делал в опубликованном мной коде (если вы удалите создание и закрытие KafkaConsumer). Но я получаю указанную мной ошибку. - person LeYAUable; 03.01.2019