Динамические каналы Spring Cloud Stream

Я использую Spring Cloud Stream и хочу программно создавать и связывать каналы. В моем случае при запуске приложения я получаю динамический список тем Kafka, на которые можно подписаться. Как я могу создать канал для каждой темы?


person Nikem    schedule 16.01.2017    source источник
comment
Вы можете проверить этот ответ на аналогичный вопрос здесь: stackoverflow.com/questions/40485421/   -  person Ilayaperumal Gopinathan    schedule 17.01.2017
comment
Это ответ для исходящих сообщений. Мне нужны входящие :(   -  person Nikem    schedule 17.01.2017
comment
ты нашел ответ? У меня такая же проблема. Было бы здорово, если бы вы могли указать мне правильное направление. Спасибо   -  person CCC    schedule 17.03.2019
comment
@CCC, нет, не видел. Мои требования изменились, поэтому для меня это больше не проблема.   -  person Nikem    schedule 18.03.2019


Ответы (5)


Недавно я столкнулся с подобным сценарием, и ниже мой пример динамического создания SubscriberChannels.

    ConsumerProperties consumerProperties = new ConsumerProperties();
    consumerProperties.setMaxAttempts(1); 
    BindingProperties bindingProperties = new BindingProperties();
    bindingProperties.setConsumer(consumerProperties);
    bindingProperties.setDestination(retryTopic);
    bindingProperties.setGroup(consumerGroup);

    bindingServiceProperties.getBindings().put(consumerName, bindingProperties);
    SubscribableChannel channel = (SubscribableChannel)bindingTargetFactory.createInput(consumerName);
    beanFactory.registerSingleton(consumerName, channel);
    channel = (SubscribableChannel)beanFactory.initializeBean(channel, consumerName);
    bindingService.bindConsumer(channel, consumerName);
    channel.subscribe(consumerMessageHandler);
person sash    schedule 30.01.2018
comment
вы можете поделиться полным исходным кодом? - person CCC; 07.03.2019
comment
@sash, скажите, пожалуйста, где вы нашли этот код? это сработало для вас? - person Yan Khonski; 14.05.2019
comment
@YanKhonski, извини, но у меня больше нет настоящего источника :( Я написал вышеупомянутое после отладки и понимания того, как создаются потребители. Я постараюсь воссоздать это, когда позволит время. - person sash; 16.05.2019
comment
Конечно, без проблем, я решил и опубликовал свое решение. В любом случае, если вспомните, поделитесь, пожалуйста. - person Yan Khonski; 16.05.2019

Мне пришлось сделать что-то подобное для Компонент Camel Spring Cloud Stream. Возможно, вам будет полезен код потребителя для привязки пункта назначения "на самом деле просто String, указывающий имя канала"?

В моем случае я привязываю только один пункт назначения, однако я не думаю, что он концептуально сильно отличается для нескольких пунктов назначения.

Вот его суть:

    @Override
    protected void doStart() throws Exception {
        SubscribableChannel bindingTarget = createInputBindingTarget();
        bindingTarget.subscribe(message -> {
            // have your way with the received incoming message
        });

        endpoint.getBindingService().bindConsumer(bindingTarget,
                endpoint.getDestination());

       // at this point the binding is done
    }

    /**
     * Create a {@link SubscribableChannel} and register in the
     * {@link org.springframework.context.ApplicationContext}
     */
    private SubscribableChannel createInputBindingTarget() {
        SubscribableChannel channel = endpoint.getBindingTargetFactory()
                .createInputChannel(endpoint.getDestination());
        endpoint.getBeanFactory().registerSingleton(endpoint.getDestination(), channel);
        channel = (SubscribableChannel) endpoint.getBeanFactory().initializeBean(channel,
                endpoint.getDestination());
        return channel;
    }

См. здесь, где вы найдете полный исходный текст и более подробный контекст.

person Donovan Muller    schedule 18.01.2017

У меня была задача, где я не знал заранее темы. Я решил это, имея один входной канал, который слушает все нужные мне темы.

https://docs.spring.io/spring-cloud-stream/docs/Brooklyn.RELEASE/reference/html/_configuration_options.html.

Место назначения

Целевое назначение канала в связанном промежуточном программном обеспечении (например, обмен RabbitMQ или тема Kafka). Если канал привязан как потребитель, он может быть привязан к нескольким адресатам, а имена адресатов могут быть указаны как строковые значения, разделенные запятыми. Если не установлен, вместо него используется имя канала.

Итак, моя конфигурация

spring:
  cloud:
    stream:
      default:
        consumer:
          concurrency: 2
          partitioned: true
      bindings:
        # inputs
        input:
          group: application_name_group
          destination: topic-1,topic-2
          content-type: application/json;charset=UTF-8

Затем я определил одного потребителя, который обрабатывает сообщения из всех этих тем.

@Component
@EnableBinding(Sink.class)
public class CommonConsumer {

    private final static Logger logger = LoggerFactory.getLogger(CommonConsumer.class);

    @StreamListener(target = Sink.INPUT)
    public void consumeMessage(final Message<Object> message) {
        logger.info("Received a message: \nmessage:\n{}", message.getPayload());
        // Here I define logic which handles messages depending on message headers and topic.
        // In my case I have configuration which forwards these messages to webhooks, so I need to have mapping topic name -> webhook URI.
    }
}

Обратите внимание, в вашем случае это может не быть решением. Мне нужно было пересылать сообщения на веб-перехватчики, чтобы иметь сопоставление конфигурации.

Я думал и о других идеях. 1) Вы клиент-клиент kafka без Spring Cloud.

2) Создайте заранее определенное количество входов, например 50.

input-1
intput-2
...
intput-50

А затем настройте некоторые из этих входов.

Связанные обсуждения

Мы используем Spring Cloud 2.1.1 RELEASE

person Yan Khonski    schedule 15.05.2019

Для входящих сообщений вы можете явно использовать BinderAwareChannelResolver для динамического разрешения места назначения. Вы можете проверить это пример, где router приемник использует преобразователь каналов с привязкой.

person Ilayaperumal Gopinathan    schedule 17.01.2017
comment
Я не понимаю. Я хочу подписаться на темы, названия которых я знаю только во время выполнения. Я не хочу отправлять / маршрутизировать сообщения. - person Nikem; 18.01.2017
comment
хорошо, извини; Я неправильно понял. Поддержка назначения dynamic предназначена только для привязки производителя. Я считаю, что эта функция еще не рассматривается и не отслеживается как часть здесь: github.com/spring-cloud/spring-cloud-stream/issues/746 - person Ilayaperumal Gopinathan; 18.01.2017
comment
@IlayaperumalGopinathan, вы знаете, обращались ли вы к этому когда-нибудь? - person CCC; 18.03.2019

person    schedule
comment
Пожалуйста, не публикуйте только код в качестве ответа, но также объясните, что делает ваш код и как он решает проблему вопроса. Ответы с объяснением обычно более полезны и качественнее, и с большей вероятностью получат положительные отзывы. - person Pouria Hemati; 21.11.2020