Вопросы по теме 'spring-cloud-stream-binder-kafka'

Spring Cloud Stream генерирует значение в виде строки, содержащей JSON, а не только JSON
В приложении потоковой обработки с использованием Spring Cloud Stream я беру входной поток (с целым числом) и вызываю для него selectKey , чтобы создать новую тему с теми же значениями, но с другим ключом (строкой). В теме ввода есть записи в...
568 просмотров

Поток ветки Spring Cloud Stream не работает должным образом
Ниже приведен код ветвления, он направлен только на одну тему (первую). Как я понял, он должен стримить по всем трем темам? В любом случае я могу транслировать по трем темам с помощью ветки? @Bean public Function<KStream<String,...
266 просмотров

Материализовать KStream в глобальное хранилище?
Я использую API Kafka Streams в приложении Java (Spring Cloud Stream). У меня есть конкретный пример использования, а именно: Мое приложение будет потреблять из темы A и производить и потреблять из темы B. Для каждого сообщения по теме A...
372 просмотров

Невозможно установить groupId и clientId при использовании Spring Cloud Stream Kafka Binder
У меня серьезные проблемы с Spring Cloud Stream Kafka Binder. Существует много неоднозначности и проблем с согласованностью, связанных с настройками конфигурации Spring Cloud 3.0.2.RELEASE. Я пытался установить идентификаторы групп и идентификаторы...
1460 просмотров

Spring Cloud Stream: несколько связывателей для Kafka Producer и Consumer с отдельной конфигурацией jaas не работают вместе
Я пытаюсь реализовать потребителя Kafka и производителя Kafka в одном приложении загрузки Spring с использованием облака Spring и связующего. Оба работают успешно, если выполняются по отдельности, но если они выполняются вместе, только Kafka Producer...
1883 просмотров

Как справиться с ошибкой сериализации в связывателе потоков Spring Cloud Stream Kafka?
Я пишу приложение потоков Kafka, используя связку потоков Kafka Stream Cloud Spring. Пока потребитель публикует сообщение в теме вывода, может возникнуть ошибка, например Ошибка сериализации или Ошибка сети . В этом коде - @Bean public...
290 просмотров

Spring Cloud Stream - изменение сообщений DLQ
Я использую функцию DLQ Spring Cloud Stream со связывателем Kafka. Когда обработка сообщения завершается неудачно, сообщение отправляется в DLQ, как и ожидалось, однако я хочу иметь возможность изменить сообщение, отправляемое в DLQ, чтобы включить...
247 просмотров

Приложение Spring Boot с использованием Spring Cloud Stream Kafka Binder + Kafka Streams Binder не работает - Producer не отправляет сообщения
В моем приложении Spring Boot 2.3.1 с SCS Hoshram.SR6 использовалось приложение Kafka Streams Binder. Мне нужно было добавить производитель Kafka, который будет использоваться в другой части приложения, поэтому я добавил связыватель kafka. Проблема...
2268 просмотров

облачный поток spring -boot с Kafka в качестве отдельной библиотеки
Я пытаюсь интегрировать библиотеку, основанную на облачном потоке с весенней загрузкой, с Kafka в приложении, отличном от Spring. Когда эта библиотека загружается в другое приложение Spring, все работает. Когда я пытаюсь инициализировать контекст...
47 просмотров

Есть ли способ опросить сообщения из темы с использованием подхода Spring Cloud Function?
Мы используем SubscribableChannel , MessageChannel и PollableMessageSource и настроены с использованием @EnableBinding и @StreamListner . Теперь нужно перейти к функциональному подходу. SubscribableChannel и MessageChannel можно...
259 просмотров

Включить/отключить привязку весеннего облачного потока на основе условных свойств
Создал библиотеку, в которой есть один потребитель облачного потока и производитель облачного потока, и эта библиотека внедрена во все микросервисы в качестве зависимости, это отлично работает для меня. У меня есть требование, чтобы некоторые...
121 просмотров

Как отложить инициализацию Spring Cloud Stream StreamListener?
У меня проблема с инициализацией StreamListener. Не могу решить свою проблему. Я использую в своем проекте Spring Cloud Stream Kafka и Spring Cache. Spring Cache инициализируется после метода start () SmartLifeCycle. Но StreamListener начинает...
206 просмотров

Функциональный подход Spring Cloud Stream: при преобразовании сообщений создается объект с пустыми значениями полей
Я пытаюсь создать реактивное приложение Spring Cloud Stream с kafka, следуя функциональному подходу (Spring Boot: 2.3.4, SC: Hoxton.SR9, SC Stream: 3.0.9, SC Function 3.0.11). Проблема: автоматически десериализованный объект имеет пустые значения...
201 просмотров

Spring Cloud Stream - несколько функций
Можно ли использовать 2 функции, где выход первой функции является входом второй? Мои функции: @Configuration public class StringStream { @Bean public Supplier<Flux<String>> stringSupplier() { return () ->...
484 просмотров

Kafka Streams: определите несколько потоков Kafka с помощью Spring Cloud Stream для каждого набора тем.
Я пытаюсь сделать простой POC с помощью Kafka Streams. Однако я получаю исключение при запуске приложения. Я использую Spring-Kafka, Kafka-Streams 2.5.1 с конфигурацией потока Spring boot 2.3.5 Kafka @Configuration public class KafkaStreamsConfig...
1214 просмотров

Новая потребительская группа выбирает сообщения с самого начала
У меня был потребитель Kafka, аннотированный StreamListener без идентификатора группы. Теперь, когда я добавил идентификатор группы через spring.cloud.stream.bindings.input.group , он начал обрабатывать все старые сообщения с самого начала, которые...
33 просмотров

Spring Cloud Stream Kafka Stream Binder, MessageChannel не создан
У меня проблема с моим простым тестовым приложением. Я хотел бы создать потребителя с связывателем потока kafka, например this . @SpringBootApplication public class CloudStreamAggregatorApplication { public static void main(String[] args) {...
232 просмотров

Spring Cloud Stream Kafka отправить сообщение
Как я могу отправить сообщение с новой функциональной моделью Spring Cloud Stream Kafka? Устаревший способ выглядел так. public interface OutputTopic { @Output("output") MessageChannel output(); } @Autowired OutputTopic...
283 просмотров

spring-cloud-stream с функциональным программированием
в моем приложении сообщения kafka использовались streamlistner из нескольких тем, но я хочу добавить функциональный потребительский компонент для одной темы. Возможно ли, что некоторые темы потребляются streamlistner, а некоторые - потребительским...
54 просмотров

Spring Cloud Stream опрашиваемый потребительский dlq и errorChannel не работают, если используется другой поток
Чтобы управлять длительной задачей с помощью Spring Cloud Stream 3.1.1 со связывателем Kafka, нам нужно использовать Pollable Consumer для управления потреблением вручную в отдельном потоке, чтобы Kafka не запускал ребалансировку. Для этого мы...
130 просмотров