Как написать интеграционный тест для аннотации @RabbitListener?

Мой вопрос действительно является продолжением вопроса к

Тест интеграции RabbitMQ и потоковая передача

Там говорится, что нужно обернуть «ваших слушателей» и передать CountDownLatch, и в конечном итоге все потоки объединятся. Этот ответ работает, если мы вручную создавали и вводили прослушиватель сообщений, но для аннотаций @RabbitListener ... я не уверен, как передать CountDownLatch. Фреймворк автоматически волшебным образом создает слушателя сообщений за кулисами.

Есть ли другие подходы?


person Selwyn    schedule 24.12.2015    source источник


Ответы (2)


С помощью @Gary Russell я смог получить ответ и использовал следующее решение.

Вывод: я должен признать, что мне безразлично это решение (похоже на взлом), но это единственное, что я могу заставить работать, и как только вы пройдете первоначальную однократную настройку и действительно поймете «рабочий поток» это не так уж больно. В основном сводится к определению (2) @Beans и добавлению их в конфигурацию Integration Test.

Пример решения размещен ниже с пояснениями. Не стесняйтесь предлагать улучшения этого решения.

1. Определите ProxyListenerBPP, который во время инициализации Spring будет прослушивать указанный clazz (то есть наш тестовый класс, содержащий @RabbitListener) и внедрять нашу настраиваемую рекомендацию CountDownLatchListenerInterceptor, определенную на следующем шаге.

import org.aopalliance.aop.Advice;
import org.springframework.aop.framework.ProxyFactoryBean;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.Ordered;
import org.springframework.core.PriorityOrdered;

/**
 * Implements BeanPostProcessor bean... during spring initialization we will
 * listen for a specified clazz 
 * (i.e our @RabbitListener annotated class) and 
 * inject our custom CountDownLatchListenerInterceptor advice
 * @author sjacobs
 *
 */
public class ProxyListenerBPP implements BeanPostProcessor, BeanFactoryAware, Ordered, PriorityOrdered{

    private BeanFactory beanFactory;
    private Class<?> clazz;
    public static final String ADVICE_BEAN_NAME = "wasCalled";

    public ProxyListenerBPP(Class<?> clazz) {
        this.clazz = clazz;
    }


    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
    }

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

        if (clazz.isAssignableFrom(bean.getClass())) {
            ProxyFactoryBean pfb = new ProxyFactoryBean();
            pfb.setProxyTargetClass(true); // CGLIB, false for JDK proxy (interface needed)
            pfb.setTarget(bean);
            pfb.addAdvice(this.beanFactory.getBean(ADVICE_BEAN_NAME, Advice.class));
            return pfb.getObject();
        }
        else {
            return bean;
        }
    }

    @Override
    public int getOrder() {
        return Ordered.LOWEST_PRECEDENCE - 1000; // Just before @RabbitListener post processor
    }

2. Создайте совет MethodInterceptor impl, который будет содержать ссылку на CountDownLatch. На CountDownLatch необходимо ссылаться как в тестовом потоке интеграции, так и внутри асинхронного рабочего потока в @RabbitListener. Так что позже мы сможем вернуться к потоку Integration Test как только асинхронный поток @RabbitListener завершит выполнение. Нет необходимости в опросе.

import java.util.concurrent.CountDownLatch;

import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;

/**
 * AOP MethodInterceptor that maps a <b>Single</b> CountDownLatch to one method and invokes 
 * CountDownLatch.countDown() after the method has completed execution. The motivation behind this 
 * is for integration testing purposes of Spring RabbitMq Async Worker threads to be able to merge
 * the Integration Test thread after an Async 'worker' thread completed its task. 
 * @author sjacobs
 *
 */
public class CountDownLatchListenerInterceptor implements MethodInterceptor {

    private CountDownLatch  countDownLatch =  new CountDownLatch(1);

    private final String methodNameToInvokeCDL ;

    public CountDownLatchListenerInterceptor(String methodName) {
        this.methodNameToInvokeCDL = methodName;
    }

    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        String methodName = invocation.getMethod().getName();

        if (this.methodNameToInvokeCDL.equals(methodName) ) {

            //invoke async work 
            Object result = invocation.proceed();

            //returns us back to the 'awaiting' thread inside the integration test
            this.countDownLatch.countDown();

            //"reset" CountDownLatch for next @Test (if testing for more async worker)
            this.countDownLatch = new CountDownLatch(1);

            return result;
        } else
            return invocation.proceed();
    }


    public CountDownLatch getCountDownLatch() {
        return countDownLatch;
    }
}

3. Затем добавьте в конфигурацию теста интеграции следующие @Bean-компоненты.

public class SomeClassThatHasRabbitListenerAnnotationsITConfig extends BaseIntegrationTestConfig {

    // pass into the constructor the test Clazz that contains the @RabbitListener annotation into the constructor
    @Bean
    public static ProxyListenerBPP listenerProxier() { // note static
        return new ProxyListenerBPP(SomeClassThatHasRabbitListenerAnnotations.class);
    }

     // pass the method name that will be invoked by the async thread in SomeClassThatHasRabbitListenerAnnotations.Class
    // I.E the method name annotated with @RabbitListener or @RabbitHandler
    // in our example 'listen' is the method name inside SomeClassThatHasRabbitListenerAnnotations.Class
    @Bean(name=ProxyListenerBPP.ADVICE_BEAN_NAME)
    public static Advice wasCalled() {
        String methodName = "listen";  
        return new CountDownLatchListenerInterceptor( methodName );
    }

    // this is the @RabbitListener bean we are testing
    @Bean
    public SomeClassThatHasRabbitListenerAnnotations rabbitListener() {
         return new SomeClassThatHasRabbitListenerAnnotations();
    }

}

4. Наконец, в интеграционном вызове @Test ... после отправки сообщения через rabbitTemplate для запуска асинхронного потока ... теперь вызовите метод CountDownLatch # await (...), полученный от перехватчика. и обязательно передайте аргументы TimeUnit, чтобы он мог истечь по таймауту в случае длительного процесса или что-то пойдет не так. После асинхронного выполнения поток Integration Test уведомляется (пробуждается), и теперь мы, наконец, можем приступить к фактическому тестированию / проверке / проверке результатов асинхронной работы.

@ContextConfiguration(classes={ SomeClassThatHasRabbitListenerAnnotationsITConfig.class } )
public class SomeClassThatHasRabbitListenerAnnotationsIT extends BaseIntegrationTest{

    @Inject 
    private CountDownLatchListenerInterceptor interceptor;

    @Inject
    private RabbitTemplate rabbitTemplate;

    @Test
    public void shouldReturnBackAfterAsyncThreadIsFinished() throws Exception {

     MyObject payload = new MyObject();
     rabbitTemplate.convertAndSend("some.defined.work.queue", payload);
        CountDownLatch cdl = interceptor.getCountDownLatch();      

        // wait for async thread to finish
        cdl.await(10, TimeUnit.SECONDS);    // IMPORTANT: set timeout args. 

        //Begin the actual testing of the results of the async work
        // check the database? 
        // download a msg from another queue? 
        // verify email was sent...
        // etc... 
}
person Selwyn    schedule 27.12.2015
comment
Не стесняйтесь открывать «новую функцию» JIRA Issue, чтобы мы могли добавить несколько хуков для тестирования. поддержка по дорожной карте. - person Gary Russell; 29.12.2015

С @RabbitListener немного сложнее, но самый простой способ - посоветовать слушателю.

С помощью фабрики настраиваемых контейнеров слушателей просто пусть ваш тестовый пример добавит совет к фабрике.

Совет будет MethodInterceptor; вызов будет иметь 2 аргумента; канал и (неконвертированный) Message. Совет должен быть введен до создания контейнера (ов).

Также можно получить ссылку на контейнер с помощью реестра и добавьте совет позже (но вам придется позвонить initialize(), чтобы применить новый совет).

Альтернативой может быть простое BeanPostProcessor проксирование вашего класса слушателя до того, как он будет введен в контейнер. Таким образом, вы увидите аргументы метода после любого преобразования; вы также сможете проверить любой результат, возвращаемый слушателем (для сценариев запроса / ответа).

Если вы не знакомы с этими методами, я могу попытаться найти время, чтобы привести вам быстрый пример.

ИЗМЕНИТЬ

Я отправил запрос на вытягивание, чтобы добавить пример в EnableRabbitIntegrationTests. Это добавляет bean-компонент слушателя с двумя аннотированными методами слушателя, BeanPostProcessor, который проксирует bean-компонент слушателя перед тем, как он будет вставлен в контейнер слушателя. К прокси-серверу добавляется Advice, который подсчитывает количество защелок при получении ожидаемых сообщений.

person Gary Russell    schedule 24.12.2015
comment
Спасибо за быстрый ответ. Был бы очень признателен за пример - person Selwyn; 24.12.2015
comment
Готово. - person Gary Russell; 24.12.2015
comment
Спасибо за ответ. Лично я не большой поклонник внедрения АОП, API отражения и т. Д. В бизнес-логику, особенно тесты и интеграционные тесты. Хотелось бы, чтобы расшифровка тестов была максимально интуитивной (принцип KISS). Можно ли было бы усовершенствовать новую аннотацию EnableRabbitCountDownLatch, которая принимает в качестве аргументов int countDown и создает bean-компонент countDownLatch, который позже может быть введен в наши тесты? Я предполагаю, что аннотацию можно поместить в конфигурацию или, может быть, как часть EnableRabbit, я не уверен, что это лучшее место. - person Selwyn; 25.12.2015
comment
поэтому countDown () сработает после того, как метод, аннотированный RabbitListener, завершит выполнение. Или этот запрос слишком специфичен для варианта использования? Мне очень нравится новая абстракция RabbitListener, которая упрощает создание messageListener, но выглядит так, как будто за нее приходится платить во время интеграционных тестов. - person Selwyn; 25.12.2015
comment
>introducing AOP, reflection api, etc... into business logic. Это не затрагивает вашу бизнес-логику, это чрезвычайно легкая прокладка между контейнером слушателя и вашим слушателем. Первый тест отсчитывает до звонка; второй после, но вы можете делать все, что хотите. Мы могли бы рассмотреть возможность добавления тестовой поддержки в фреймворк, например. @RabbitIntegrationTest но, скорее всего, мы бы реализовали это с помощью этой техники; мы бы не хотели загрязнять основной код артефактами тестирования. Конечно, полный интеграционный тест подтвердит все, что делается ниже вашего слушателя. - person Gary Russell; 25.12.2015
comment
>assertTrue(this.interceptor.oneWayLatch.await(10, TimeUnit.SECONDS)); вызов этого внутри интеграционного теста вводит логику, не имеющую ничего общего с бизнес-тестом. У меня нет проблем с spring ВНУТРЕННЕЕ с использованием aop, отражения и т. Д., И я попытался реализовать этот пример, и я действительно борюсь с ним. У меня странное поведение. Мои тесты проходят, если я запускаю их в обычном режиме, но терпят неудачу, если я пытаюсь пройти их с помощью отладчика ... Более того, масштабируемость ограничена, потому что вам нужно реализовать 1 countDownLatch для каждого вызова метода RabbitListener. - person Selwyn; 26.12.2015
comment
и становится трудно тестировать другие функции. Например, если я хочу протестировать несколько вызовов прослушивателя для одного кролика в одном и том же тесте, или если я хочу протестировать настраиваемый «валидатор», или ожидаемую обработку исключений через RetryHandler. Это усложняет объединение потоков вручную через countDownLatch. Я действительно заблудился прямо сейчас относительно того, как двигаться дальше, я хочу, чтобы асинхронные потоки в производстве для контейнера rabbitListener через аннотацию, но для теста интеграции я действительно хотел бы `` как-то '' просто выполнить всю логику последовательно без иметь дело с проблемами параллелизма - person Selwyn; 26.12.2015
comment
Я дал очень простой совет, вы можете сделать его настолько сложным, насколько захотите - вставляйте новые защелки из каждого теста и т. Д .; Я не уверен, что вас беспокоит с calling that inside the integration test is introducing logic that has nothing to do with the business taste case. - это всегда проблема при тестировании асинхронных приложений - как-то дождаться завершения какого-то внешнего события; Вы должны покрыть функциональность вашего слушателя модульными тестами. Как я уже сказал, для полного теста интеграции e2e вам действительно нужно запустить что-то для проверки из того, что ниже по течению от вашего слушателя. - person Gary Russell; 26.12.2015
comment
достаточно честно, но я думаю, я просто хотел быть «абстрагированным» от внутренней части spring framework / rabbit ... чувствовал, что я боролся с этой проблемой. Несмотря на это, я заставил это работать, в конце концов, спасибо за примеры. - person Selwyn; 27.12.2015
comment
Поскольку я новичок в этом, есть ли простой проект JUnit / Test с convertAndSend, а на другом конце - реальный @RabbitListener и упаковка, как обсуждалось, не касаясь реального производственного кода. Что-то, что я могу git clone, запустить и попробовать. - person powder366; 14.03.2017
comment
Вам следует задавать новые вопросы, а не добавлять новый вопрос в комментарий. См. справочную документацию о поддержке тестирования. Если после прочтения у вас возникнет конкретный вопрос; это как новый вопрос с тегом [spring-amqp]. - person Gary Russell; 14.03.2017