RxJava подписывается и наблюдает за тем же потоком, что и модульный тест

Я хочу написать своего рода «тест черного ящика» для компонента, который внутренне использует RxJava.

Внутри он использует Retrofit, который возвращает Observable для выполнения HTTP-вызова, а затем использует .flatmap() для будущей обработки данных, полученных при модернизации. Идея состоит в том, чтобы дать этому компоненту Transformer для установки планировщиков в наблюдателе следующим образом:

class DefaultTransformer <T> implements Transformer<T, T> {

   public Observable<T> call(Observable<T> observable) { 
      return observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread());
   }
}

Мой компонент делает что-то вроде этого:

void execute(Transformer<T, T> scheduler){
     Observable<List<Team>> observable = retrofitApi.getLeague(leagueId, seasonId)
        .flatMap(new Func1<LeagueWrapper, Observable<List<Team>>>() {
          @Override public Observable<List<Team>> call(LeagueWrapper wrapper) {                
             return Observable.just(wrapper.getLeague().getTeams());
          }
        });

   observable.compose(transformer);

   observable.subscribe(this);
}

В рабочей среде я передаю DefaultTransformer в качестве параметра, но для модульных тестов я хочу отправить Transformer, который выполняется в том же потоке, что и модульный тест, поэтому все должно выполняться синхронно (а не асинхронно).

Я пробовал это:

class UnitTestTransformer <T> implements Transformer<T, T> {

       public Observable<T> call(Observable<T> observable) { 
          return observable.subscribeOn(Schedulers.test()).observeOn(AndroidSchedulers.test());
       }
    }

Но он по-прежнему работает асинхронно в моих модульных тестах. Я также пробовал Scheduler.immediate(). toBlocking() похоже не вариант, потому что это уже не Observable. Есть идеи, что может быть не так?


person sockeqwe    schedule 18.03.2015    source источник
comment
observable.compose(transformer); это проблема. Все экземпляры Observable являются неизменяемыми, и вызов compose не изменяет существующий Observable, а возвращает новый (который игнорируется в вашем коде). observable.compose(transformer).subscribe(this) должно работать нормально.   -  person Vladimir Mironov    schedule 18.03.2015
comment
Еще кое-что. Все Observable, возвращаемые retrofit, уже имеют указанный Scheduler, потому что retrofit вызывает subscribeOn внутренне. Вызов subscribeOn еще раз фактически не изменит окончательный планировщик, в котором будет выполняться сетевой вызов.   -  person Vladimir Mironov    schedule 18.03.2015
comment
Если вы хотите протестировать свой код, вам следует использовать оператор toBlocking (который очень часто используется в большинстве модульных тестов RxJava).   -  person Vladimir Mironov    schedule 18.03.2015
comment
Привет, спасибо за подсказку compose()! Значит нет возможности обновить Scheduler дооснащения? toBlock() у меня не сработает, потому что я не могу изменить метод execute() моего компонента, или вы видите обходной путь для этого? Метод execute будет вызываться внутри (среди других вызовов, отличных от RxJava). Единственное, что я могу сделать, это установить Scheduler с Component.setScheduler(Transformer t), который будет передан в качестве аргумента для Component.execute()   -  person sockeqwe    schedule 18.03.2015


Ответы (2)


Если изменение шаблона вызова execute() не вариант, вы можете попробовать использовать механизм RxJava Plugin.

https://github.com/ReactiveX/RxJava/wiki/Plugins

Вы можете предоставить:

  • RxJavaSchedulersHook, чтобы переопределить планировщики, предоставленные во время выполнения теста, и заставить их выполняться синхронно
  • RxJavaObservableExecutionHook для подключения к конвейеру выполнения Observable и использования какого-либо метода синхронизации (например, CountdownLatch), чтобы дождаться завершения подписки Observable, прежде чем продолжить.
person Ross Hambrick    schedule 23.03.2015

У меня была аналогичная проблема, и я решил ее с помощью класса TestObserver (http://reactivex.io/RxJava/javadoc/rx/observers/TestObserver.html)

Он имеет следующие три метода, предоставляющие доступ к полученным событиям:

getOnCompletedEvents()
getOnErrorEvents()
getOnNextEvents()

Я надеюсь, что это может помочь и вам, если вы сможете как-то внедрить своего подписчика. Вот пример того, как я это тестировал:

TestObserver<MyModel> testObserver = new TestObserver<>();
myObservableSupplyingMethod().subscribe(testObserver);

assertThat(testObserver.getOnErrorEvents().size()).isEqualTo(0);
assertThat(testObserver.getOnNextEvents().get(0)).isNotNull();
...
person Maximosaic    schedule 03.06.2015