С помощью @Gary Russell я смог получить ответ и использовал следующее решение.
Вывод: я должен признать, что мне безразлично это решение (похоже на взлом), но это единственное, что я могу заставить работать, и как только вы пройдете первоначальную однократную настройку и действительно поймете «рабочий поток» это не так уж больно. В основном сводится к определению (2) @Beans и добавлению их в конфигурацию Integration Test.
Пример решения размещен ниже с пояснениями. Не стесняйтесь предлагать улучшения этого решения.
1. Определите ProxyListenerBPP, который во время инициализации Spring будет прослушивать указанный clazz (то есть наш тестовый класс, содержащий @RabbitListener) и внедрять нашу настраиваемую рекомендацию CountDownLatchListenerInterceptor, определенную на следующем шаге.
import org.aopalliance.aop.Advice;
import org.springframework.aop.framework.ProxyFactoryBean;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.Ordered;
import org.springframework.core.PriorityOrdered;
/**
* Implements BeanPostProcessor bean... during spring initialization we will
* listen for a specified clazz
* (i.e our @RabbitListener annotated class) and
* inject our custom CountDownLatchListenerInterceptor advice
* @author sjacobs
*
*/
public class ProxyListenerBPP implements BeanPostProcessor, BeanFactoryAware, Ordered, PriorityOrdered{
private BeanFactory beanFactory;
private Class<?> clazz;
public static final String ADVICE_BEAN_NAME = "wasCalled";
public ProxyListenerBPP(Class<?> clazz) {
this.clazz = clazz;
}
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (clazz.isAssignableFrom(bean.getClass())) {
ProxyFactoryBean pfb = new ProxyFactoryBean();
pfb.setProxyTargetClass(true); // CGLIB, false for JDK proxy (interface needed)
pfb.setTarget(bean);
pfb.addAdvice(this.beanFactory.getBean(ADVICE_BEAN_NAME, Advice.class));
return pfb.getObject();
}
else {
return bean;
}
}
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE - 1000; // Just before @RabbitListener post processor
}
2. Создайте совет MethodInterceptor impl, который будет содержать ссылку на CountDownLatch. На CountDownLatch необходимо ссылаться как в тестовом потоке интеграции, так и внутри асинхронного рабочего потока в @RabbitListener. Так что позже мы сможем вернуться к потоку Integration Test как только асинхронный поток @RabbitListener завершит выполнение. Нет необходимости в опросе.
import java.util.concurrent.CountDownLatch;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
/**
* AOP MethodInterceptor that maps a <b>Single</b> CountDownLatch to one method and invokes
* CountDownLatch.countDown() after the method has completed execution. The motivation behind this
* is for integration testing purposes of Spring RabbitMq Async Worker threads to be able to merge
* the Integration Test thread after an Async 'worker' thread completed its task.
* @author sjacobs
*
*/
public class CountDownLatchListenerInterceptor implements MethodInterceptor {
private CountDownLatch countDownLatch = new CountDownLatch(1);
private final String methodNameToInvokeCDL ;
public CountDownLatchListenerInterceptor(String methodName) {
this.methodNameToInvokeCDL = methodName;
}
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
String methodName = invocation.getMethod().getName();
if (this.methodNameToInvokeCDL.equals(methodName) ) {
//invoke async work
Object result = invocation.proceed();
//returns us back to the 'awaiting' thread inside the integration test
this.countDownLatch.countDown();
//"reset" CountDownLatch for next @Test (if testing for more async worker)
this.countDownLatch = new CountDownLatch(1);
return result;
} else
return invocation.proceed();
}
public CountDownLatch getCountDownLatch() {
return countDownLatch;
}
}
3. Затем добавьте в конфигурацию теста интеграции следующие @Bean-компоненты.
public class SomeClassThatHasRabbitListenerAnnotationsITConfig extends BaseIntegrationTestConfig {
// pass into the constructor the test Clazz that contains the @RabbitListener annotation into the constructor
@Bean
public static ProxyListenerBPP listenerProxier() { // note static
return new ProxyListenerBPP(SomeClassThatHasRabbitListenerAnnotations.class);
}
// pass the method name that will be invoked by the async thread in SomeClassThatHasRabbitListenerAnnotations.Class
// I.E the method name annotated with @RabbitListener or @RabbitHandler
// in our example 'listen' is the method name inside SomeClassThatHasRabbitListenerAnnotations.Class
@Bean(name=ProxyListenerBPP.ADVICE_BEAN_NAME)
public static Advice wasCalled() {
String methodName = "listen";
return new CountDownLatchListenerInterceptor( methodName );
}
// this is the @RabbitListener bean we are testing
@Bean
public SomeClassThatHasRabbitListenerAnnotations rabbitListener() {
return new SomeClassThatHasRabbitListenerAnnotations();
}
}
4. Наконец, в интеграционном вызове @Test ... после отправки сообщения через rabbitTemplate для запуска асинхронного потока ... теперь вызовите метод CountDownLatch # await (...), полученный от перехватчика. и обязательно передайте аргументы TimeUnit, чтобы он мог истечь по таймауту в случае длительного процесса или что-то пойдет не так. После асинхронного выполнения поток Integration Test уведомляется (пробуждается), и теперь мы, наконец, можем приступить к фактическому тестированию / проверке / проверке результатов асинхронной работы.
@ContextConfiguration(classes={ SomeClassThatHasRabbitListenerAnnotationsITConfig.class } )
public class SomeClassThatHasRabbitListenerAnnotationsIT extends BaseIntegrationTest{
@Inject
private CountDownLatchListenerInterceptor interceptor;
@Inject
private RabbitTemplate rabbitTemplate;
@Test
public void shouldReturnBackAfterAsyncThreadIsFinished() throws Exception {
MyObject payload = new MyObject();
rabbitTemplate.convertAndSend("some.defined.work.queue", payload);
CountDownLatch cdl = interceptor.getCountDownLatch();
// wait for async thread to finish
cdl.await(10, TimeUnit.SECONDS); // IMPORTANT: set timeout args.
//Begin the actual testing of the results of the async work
// check the database?
// download a msg from another queue?
// verify email was sent...
// etc...
}
person
Selwyn
schedule
27.12.2015