StreamRetryTemplate для Spring Cloud Streams не повторяет попытки в интеграционных тестах

Мы используем Spring Cloud Streams, которые слушают тему Kafka и вызывают службу отдыха. Мы также реализуем настраиваемый StreamRetryTemplate, чтобы указать, какие ошибки мы считаем исправляемыми, а какие - нет. Я не могу получить согласованные результаты между тем, как это работает во время выполнения, и тем, как это работает в интеграционных тестах.

Я проверил в режиме отладки, что исключение генерируется правильно и что RetryTemplate вводится правильно, но, похоже, он просто не используется в моих интеграционных тестах.

@EnableBinding(Sink::class)
class MyListener(private val myService: Service) {

  @StreamListener(Sink.Input)
  fun consume(@Payload msg: MyMessage) = myService.process(msg)

  @SteamRetryTemplate
  fun getRetryTemplate() = RetryTemplate()
}

Когда я запускаю это приложение и myService выдает исключение, я ожидаю, что он будет повторен, и он делает это отлично. Но когда я пишу интеграционные тесты с сервером Wiremock и вызываю исключение myService, он не пытается повторить попытку. У меня есть операторы assert, чтобы проверить, сколько раз была достигнута моя конечная точка wiremock.

Мне что-то не хватает специально для повторных попыток работы в интеграционных тестах?


person chas spenlau    schedule 25.04.2019    source источник


Ответы (1)


Вы используете тестовую привязку или встроенный брокер kafka? Испытательное связующее довольно ограничено; использование встроенного брокера предпочтительнее для полного интеграционного тестирования.

См. Тестирование приложений Spring для документации Apache Kafka.

ИЗМЕНИТЬ

@SpringBootApplication
@EnableBinding(Sink.class)
public class So55855151Application {

    public static void main(String[] args) {
        SpringApplication.run(So55855151Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in) {
        System.out.println(in);
        throw new RuntimeException("fail");
    }

    @StreamRetryTemplate
    public RetryTemplate retrier() {
        return new RetryTemplate();
    }

}
spring.cloud.stream.bindings.input.group=input
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka
public class So55855151ApplicationTests {

    @Autowired
    private KafkaTemplate<byte[], byte[]> template;

    @Autowired
    private RetryTemplate retrier;

    @Test
    public void test() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(5);
        this.retrier.registerListener(new RetryListener() {

            @Override
            public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
                System.out.println("open");
                latch.countDown();
                return true;
            }

            @Override
            public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
                    Throwable throwable) {

                System.out.println("close");
                latch.countDown();
            }

            @Override
            public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
                    Throwable throwable) {

                System.out.println("onError: " + throwable);
                latch.countDown();
            }

        });

        this.template.send("input", "test".getBytes());
        assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
    }

}
person Gary Russell    schedule 25.04.2019
comment
Я использую EmbeddedKafkaBroker с WireMockServer. Спасибо за ссылку на документацию, я обязательно прочту ее и посмотрю, есть ли для меня какие-либо ответы. - person chas spenlau; 25.04.2019
comment
Если вы используете Spring Initializr для создания своего проекта, вам необходимо удалить spring-cloud-stream-test-support jar из pom (или исключить тестовую привязку в вашем тесте). В противном случае тестовая связка победит. Я только что протестировал пользовательский StreamRetryTemplate и вижу, что он используется в тестовом примере. - person Gary Russell; 25.04.2019
comment
Я добавил свой тест. - person Gary Russell; 25.04.2019
comment
Спасибо вам большое за это. Извините, я задержался с ответом. Ваш пример меня очень близко подобрал! У вас есть хороший пример, чтобы эта точная или похожая тестовая установка работала с сериализованным объектом Avro вместо простого byte []? Я пробовал использовать здесь пример ‹objectpartners. com / 2018/08/21 / ›для MockSchemaRegistryClient, но я не могу заставить его работать. - person chas spenlau; 01.05.2019
comment
Извините, я не работал с реестром схем. Предлагаю вам задать новый вопрос с подробностями. - person Gary Russell; 01.05.2019
comment
Я отмечаю ваш ответ как правильный, поскольку я уверен, что если бы я не использовал Avro (в моем первоначальном вопросе этого явно не было), вы были бы на высоте. Спасибо за вашу помощь! - person chas spenlau; 02.05.2019