Я использую Spring Cloud Stream и хочу программно создавать и связывать каналы. В моем случае при запуске приложения я получаю динамический список тем Kafka, на которые можно подписаться. Как я могу создать канал для каждой темы?
Динамические каналы Spring Cloud Stream
Ответы (5)
Недавно я столкнулся с подобным сценарием, и ниже мой пример динамического создания SubscriberChannels.
ConsumerProperties consumerProperties = new ConsumerProperties();
consumerProperties.setMaxAttempts(1);
BindingProperties bindingProperties = new BindingProperties();
bindingProperties.setConsumer(consumerProperties);
bindingProperties.setDestination(retryTopic);
bindingProperties.setGroup(consumerGroup);
bindingServiceProperties.getBindings().put(consumerName, bindingProperties);
SubscribableChannel channel = (SubscribableChannel)bindingTargetFactory.createInput(consumerName);
beanFactory.registerSingleton(consumerName, channel);
channel = (SubscribableChannel)beanFactory.initializeBean(channel, consumerName);
bindingService.bindConsumer(channel, consumerName);
channel.subscribe(consumerMessageHandler);
Мне пришлось сделать что-то подобное для Компонент Camel Spring Cloud Stream. Возможно, вам будет полезен код потребителя для привязки пункта назначения "на самом деле просто String
, указывающий имя канала"?
В моем случае я привязываю только один пункт назначения, однако я не думаю, что он концептуально сильно отличается для нескольких пунктов назначения.
Вот его суть:
@Override
protected void doStart() throws Exception {
SubscribableChannel bindingTarget = createInputBindingTarget();
bindingTarget.subscribe(message -> {
// have your way with the received incoming message
});
endpoint.getBindingService().bindConsumer(bindingTarget,
endpoint.getDestination());
// at this point the binding is done
}
/**
* Create a {@link SubscribableChannel} and register in the
* {@link org.springframework.context.ApplicationContext}
*/
private SubscribableChannel createInputBindingTarget() {
SubscribableChannel channel = endpoint.getBindingTargetFactory()
.createInputChannel(endpoint.getDestination());
endpoint.getBeanFactory().registerSingleton(endpoint.getDestination(), channel);
channel = (SubscribableChannel) endpoint.getBeanFactory().initializeBean(channel,
endpoint.getDestination());
return channel;
}
См. здесь, где вы найдете полный исходный текст и более подробный контекст.
У меня была задача, где я не знал заранее темы. Я решил это, имея один входной канал, который слушает все нужные мне темы.
Место назначения
Целевое назначение канала в связанном промежуточном программном обеспечении (например, обмен RabbitMQ или тема Kafka). Если канал привязан как потребитель, он может быть привязан к нескольким адресатам, а имена адресатов могут быть указаны как строковые значения, разделенные запятыми. Если не установлен, вместо него используется имя канала.
Итак, моя конфигурация
spring:
cloud:
stream:
default:
consumer:
concurrency: 2
partitioned: true
bindings:
# inputs
input:
group: application_name_group
destination: topic-1,topic-2
content-type: application/json;charset=UTF-8
Затем я определил одного потребителя, который обрабатывает сообщения из всех этих тем.
@Component
@EnableBinding(Sink.class)
public class CommonConsumer {
private final static Logger logger = LoggerFactory.getLogger(CommonConsumer.class);
@StreamListener(target = Sink.INPUT)
public void consumeMessage(final Message<Object> message) {
logger.info("Received a message: \nmessage:\n{}", message.getPayload());
// Here I define logic which handles messages depending on message headers and topic.
// In my case I have configuration which forwards these messages to webhooks, so I need to have mapping topic name -> webhook URI.
}
}
Обратите внимание, в вашем случае это может не быть решением. Мне нужно было пересылать сообщения на веб-перехватчики, чтобы иметь сопоставление конфигурации.
Я думал и о других идеях. 1) Вы клиент-клиент kafka без Spring Cloud.
2) Создайте заранее определенное количество входов, например 50.
input-1
intput-2
...
intput-50
А затем настройте некоторые из этих входов.
Связанные обсуждения
- Spring облачный поток для поддержки динамической маршрутизации сообщений
- https://github.com/spring-cloud/spring-cloud-stream/issues/690
- https://github.com/spring-cloud/spring-cloud-stream/issues/1089
Мы используем Spring Cloud 2.1.1 RELEASE
Для входящих сообщений вы можете явно использовать BinderAwareChannelResolver
для динамического разрешения места назначения. Вы можете проверить это пример, где router
приемник использует преобразователь каналов с привязкой.
dynamic
предназначена только для привязки производителя. Я считаю, что эта функция еще не рассматривается и не отслеживается как часть здесь: github.com/spring-cloud/spring-cloud-stream/issues/746
- person Ilayaperumal Gopinathan; 18.01.2017