У меня проблема с моей подпиской Rx с использованием оператора Switch.
_performSearchSubject
.AsObservable()
.Select(_ => PerformQuery())
.Switch()
.ObserveOn(_synchronizationContextService.SynchronizationContext)
.Subscribe(DataArrivedForPositions, PositionQueryError, PositionQueryCompleted)
.DisposeWith(this);
Поток:
- Некоторые свойства изменяются, и вызывается метод PerformSearchSubject.OnNext.
- Вызывается метод PerformPositionQuery(), который возвращает наблюдателя каждый раз, когда он нажимается.
- Служба, которая отвечает через этого наблюдателя, дважды вызывает OnNext и один раз OnCompleted, когда получение данных завершено.
- Метод DataArrivedForPositions вызывается дважды, как и ожидалось.
- Метод PositionQueryCompleted никогда не вызывается, хотя внутри моего сервиса данных вызываетсяObserver.OnCompleted().
Код для dataService:
protected override void Request(Request request, IObserver<Response> observer)
{
query.Arrive += p => QueryReceive(request.RequestId, p, observer, query);
query.Error += (type, s, message) => QueryError(observer, message);
query.NoMoreData += id => QueryCompleted(observer);
query.Execute(request);
}
private void QueryError(IObserver<PositionSheetResponse> observer, string message)
{
observer.OnError(new Exception(message));
}
private void QueryCompleted(IObserver<PositionSheetResponse> observer)
{
observer.OnCompleted();
}
private void QueryReceive(Guid requestId, Qry0079Receive receiveData, IObserver<PositionSheetResponse> observer, IQry0079PositionSheet query)
{
observer.OnNext(ConvertToResponse(requestId, receiveData));
}