Spring Cloud Stream Kafka Stream Binder, MessageChannel не создан

У меня проблема с моим простым тестовым приложением. Я хотел бы создать потребителя с связывателем потока kafka, например this.

@SpringBootApplication
public class CloudStreamAggregatorApplication {

  public static void main(String[] args) {
    SpringApplication.run(CloudStreamAggregatorApplication.class, args);
  }
  @Bean
  public Consumer<KStream<String,String>> consume() {
    return input -> input.foreach((k,v) -> System.out.println("CONSUMER: "+v));
  }
}

Но когда я пытаюсь это проверить

@SpringBootTest
@EmbeddedKafka
@Import(TestChannelBinderConfiguration.class)
@DirtiesContext
class CloudStreamConsumerApplicationTests {

  @Autowired
  private InputDestination input;

  @Test
  void test01_Consume() {
    input.send(new GenericMessage<>("test"));
  }
}

Я получаю исключение

java.lang.IndexOutOfBoundsException: Index 0 out of bounds for length 0
  at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) ~[na:na]
  at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70) ~[na:na]
  at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248) ~[na:na]
  at java.base/java.util.Objects.checkIndex(Objects.java:359) ~[na:na]
  at java.base/java.util.ArrayList.get(ArrayList.java:427) ~[na:na]
  at org.springframework.cloud.stream.binder.test.AbstractDestination.getChannel(AbstractDestination.java:34) ~[spring-cloud-stream-3.1.0-test-binder.jar:3.1.0]

И когда я меняю метод потребления на

@Bean
public Consumer<Flux<String>> consume() {
    return f -> f.subscribe(p -> System.out.println("CONSUMER: "+p));
}

It's ok.

Пытаюсь распечатать созданные каналы

@Autowired
private Map<String,MessageChannel> channels;
...
channels.forEach((k,v) -> System.out.println("CHANNEL: "+k));

Я получаю в первом случае

CHANNEL: nullChannel
CHANNEL: errorChannel

А во втором случае

CHANNEL: nullChannel
CHANNEL: errorChannel
CHANNEL: consume-in-0
CHANNEL: test.anonymous.errors

Я не понял, почему это происходит. Кто-нибудь может мне помочь!


person Igor Tytar    schedule 24.01.2021    source источник


Ответы (1)


Связыватель Kafka Streams не является MessageChannelBinder, поэтому он не использует внутренние каналы сообщений.

Вы не можете тестировать с помощью привязки каналов тестовых сообщений.

person Gary Russell    schedule 25.01.2021
comment
Спасибо. Но как правильно протестировать связыватель потока kafka? - person Igor Tytar; 30.01.2021
comment
Используйте настоящего брокера с EmbeddedKafkaBroker spring-kafka-test - см. образцы для примеров. Подробнее о встроенном брокере здесь и в его Javadocs. - person Gary Russell; 30.01.2021
comment
Мне кажется, IndexOutOfBoundsException в данном случае не подходит :) например IllegalStateException будет более понятным. - person Igor Tytar; 31.01.2021
comment
Я не возражаю. Это не интуитивно. Рассмотрите возможность открытия вопроса на GitHub. - person Gary Russell; 31.01.2021