Реактивные расширения (Rx) Switch() создают новый наблюдаемый объект, который не подписан на предоставленный OnCompleted()

У меня проблема с моей подпиской Rx с использованием оператора Switch.

_performSearchSubject
            .AsObservable()
            .Select(_ => PerformQuery())
            .Switch()
            .ObserveOn(_synchronizationContextService.SynchronizationContext)
            .Subscribe(DataArrivedForPositions, PositionQueryError, PositionQueryCompleted)
            .DisposeWith(this);

Поток:

  1. Некоторые свойства изменяются, и вызывается метод PerformSearchSubject.OnNext.
  2. Вызывается метод PerformPositionQuery(), который возвращает наблюдателя каждый раз, когда он нажимается.
  3. Служба, которая отвечает через этого наблюдателя, дважды вызывает OnNext и один раз OnCompleted, когда получение данных завершено.
  4. Метод DataArrivedForPositions вызывается дважды, как и ожидалось.
  5. Метод PositionQueryCompleted никогда не вызывается, хотя внутри моего сервиса данных вызываетсяObserver.OnCompleted().

Код для dataService:

        protected override void Request(Request request, IObserver<Response> observer)
        {
            query.Arrive += p => QueryReceive(request.RequestId, p, observer, query);
            query.Error += (type, s, message) => QueryError(observer, message);
            query.NoMoreData += id => QueryCompleted(observer);

            query.Execute(request);
        }

        private void QueryError(IObserver<PositionSheetResponse> observer, string message)
        {
            observer.OnError(new Exception(message));
        }

        private void QueryCompleted(IObserver<PositionSheetResponse> observer)
        {
            observer.OnCompleted();
        }

        private void QueryReceive(Guid requestId, Qry0079Receive receiveData, IObserver<PositionSheetResponse> observer, IQry0079PositionSheet query)
        {
            observer.OnNext(ConvertToResponse(requestId, receiveData));
        }

person VidasV    schedule 28.12.2015    source источник


Ответы (1)


Switch результат будет выполнен только после завершения вашего внешнего наблюдаемого (_performSearchSubject). Я предполагаю, что в вашем случае это никогда не происходит (вероятно, это связано с действием пользователя, выполняющим поиск).

Что неясно, так это когда вы ожидаете вызова PositionQueryCompleted. Если это происходит после обработки каждого успешного запроса, то ваш поток необходимо изменить, потому что Switch потерял информацию о том, что поток запросов завершен, но ему также не хватает информации о пользовательском интерфейсе (даже неправильном планировщике), чтобы сказать, были ли его данные фактически обработаны.

Могут быть и другие способы добиться этого, но в основном вы хотите, чтобы ваш поток запросов был завершен, чтобы выжить до Switch (которые в настоящее время игнорируют это событие). Например, вы можете преобразовать свой поток запросов, чтобы иметь n+1 событий, с одним дополнительным для завершения:

    _performSearchSubject
        .AsObservable()
        .Select(_ => 
                  PerformQuery()
                  .Select(Data => new { Data, Complete = false})
                  .Concat(Observable.Return(new { Data = (string)null, Complete = true })))

На него можно смело применять .Switch().ObserveOn(_synchronizationContextService.SynchronizationContext), но тогда нужно модифицировать подписку:

    .Subscribe(data => {
        if (data.Complete) DataArrivedForPositions(data.Data);
        else PositionQueryCompleted()
    }, PositionQueryError)
person Gluck    schedule 28.12.2015
comment
Ваше предположение верно! Каждый раз, когда я вызываю PerformSearcsubject.OnNext() при нажатии кнопки, он создает новый наблюдаемый объект, который используется переключателем. Хотя эта тема никогда не завершается. Я хочу, чтобы метод PositionQueryCompleted вызывался всякий раз, когда последний запрос успешно завершается. Значение внутреннего наблюдаемого OnCompleted() было вызвано. - person VidasV; 28.12.2015
comment
Добавлено решение, которое предполагает, что ваш объект результатов запроса еще не включает «информацию о завершении», потому что в остальном это тривиально. - person Gluck; 28.12.2015
comment
Во-первых, большое Вам спасибо! Во-вторых, я вижу вашу точку зрения и понимаю серое пятно на моем мышлении. Тем не менее, у меня остается вопрос: почему подписка OnNext сохраняется на более свежем наблюдаемом, а OnError и OnCompleted — нет? Что мне там не хватает? - person VidasV; 29.12.2015
comment
OnError также сохраняется (почти уверен, что Switch вызовет OnError, если какая-либо из ошибок внутреннего потока), но OnComplete нет и не может быть, потому что: это означало бы, что дальнейший поиск невозможен) - поведение переключения было бы недетерминированным, потому что переключение с одного потока, завершающегося на другой, будет иметь очень различное поведение, независимо от того, завершается ли первый поток до или после второго OnNext, и что не сделал бы этот оператор очень полезным - person Gluck; 29.12.2015
comment
Огромное спасибо за ваши усилия, теперь я правильно понял. Кроме того, я не ожидал, что так быстро получу ответ на Rx... - person VidasV; 29.12.2015
comment
Помните, что протокол для Rx — OnNext* (OnError | OnCompleted). то есть у вас может быть 0, 1 или много OnNext обратных вызовов. При желании вы можете иметь обратный вызов OnError или OnCompleted после ваших обратных вызовов 0-many OnNext. Однако после того, как вы получили OnError или OnCompleted, последовательность является конечной и не должна производить больше никаких обратных вызовов. Вот почему ваш коммутатор не будет передавать внутренние обратные вызовы OnCompleted. - person Lee Campbell; 30.12.2015