Как использовать forkJoin Observable с собственным событием

Я использую Subject of reactivex в своем приложении angular2, чтобы сигнализировать о событии.

Когда я делаю что-то подобное:

let subject1 = new Subject<string>();
let subject2 = new Subject<string>();
subject1.subscribe(data=>console.debug(data));        
subject2.subscribe(data=>console.debug(data));        
subject1.next("this is test event1");
subject2.next("this is test event2");

все работает нормально, но я хочу дождаться срабатывания обоих событий, а затем выполнить некоторые действия. Я нашел Observable.forkJoin, но не могу заставить его работать с субъектами. Такой код не работает

Observable.forkJoin(
           subject1.asObservable(),
           subject2.asObservable()
        ).subscribe(
            data => {
              console.debug("THIS IS MY FJ");
              console.debug(JSON.stringify(data));
            },
            error=>console.error(error),
            ()=>{
              console.info('THIS IS MY FJ SUCCESS');
            }
        );        

Можете ли вы помочь мне с этим вопросом, пожалуйста.

С наилучшими пожеланиями Кшиштоф Шевчик


person Krzysztof Szewczyk    schedule 06.05.2016    source источник
comment
есть ли ошибка?   -  person eko    schedule 06.05.2016
comment
@echonax нет, ничего, просто метод подписки onNext никогда не вызывается   -  person Krzysztof Szewczyk    schedule 06.05.2016
comment
Кроме того, это не работает, потому что вам нужно вызвать complete на Subject, чтобы получить эмиссию от forkJoin.   -  person paulpdaniels    schedule 08.05.2016


Ответы (1)


В вашем случае вместо этого вам нужно использовать оператор zip. Этот оператор объединит указанные наблюдаемые последовательности, тогда как forkJoin запускает все наблюдаемые последовательности параллельно и собирает их последние элементы.

Таким образом, оператор forkJoin подходит, например, для наблюдаемых HTTP, но не для субъектов.

Вот образец.

export class App {
  subject1: Subject<string> = new Subject();
  subject2: Subject<string> = new Subject();

  constructor() {
    this.subject1.subscribe(data=>console.debug(data));        
    this.subject2.subscribe(data=>console.debug(data));        

    Observable.zip(
      this.subject1,
      this.subject2
    ).subscribe(
      data => {
        console.debug("THIS IS MY FJ");
        console.debug(JSON.stringify(data));
      },
      error=>console.error(error),
      ()=>{
        console.info('THIS IS MY FJ SUCCESS');
      }
  );        
}

test() {
  this.subject1.next("this is test event1");
  this.subject2.next("this is test event2");
}

См. соответствующий планкр: https://plnkr.co/edit/X74lViYOgcxzb1AjC9dL?p=preview.

person Thierry Templier    schedule 06.05.2016