Сжатие Rx IObservable с бесконечным набором чисел

У меня есть IObservable [именованные строки в приведенном ниже примере] из среды расширений Reactive, и я хочу добавить номера индексов к каждому объекту, который он наблюдает.

Я попытался реализовать это с помощью функции Zip:

rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) => 
    new { Row = row, Index = index })
    .Subscribe(a => ProcessRow(a.Row, a.Index), () => Completed());

.. но, к сожалению, это бросает

ArgumentOutOfRangeException: указанный аргумент находится вне диапазона допустимых значений. Название параметра: одноразовые

Я неправильно понимаю функцию Zip или проблема в моем коде?

Часть кода Range не кажется проблемой, и IObservable еще не получает никаких событий.


person Toni Kielo    schedule 24.03.2010    source источник
comment
Я не получаю это исключение... Что такое тип строк, IEnumerable, List, IObservable? Я разместил свой код, потому что комментарий не может его принять ... чего не хватает в моем, что заставляет ваш выбрасывать?   -  person DoctorFoo    schedule 28.03.2010


Ответы (3)


.Select имеет перегрузку для включения индекса:

rows.Select((row, index) => new { row, index });
person Sergey Aldoukhov    schedule 25.04.2010

По-видимому, методы расширения Zip преобразуют исходный пользовательский IObservable в анонимный наблюдаемый, а подписка на него создает System.Collections.Generic.AnonymousObserver, который не реализует IDisposable. Таким образом, вы не можете реализовать метод Subscribe обычным способом (по крайней мере, так, как я его видел), т.е.

public IDisposable Subscribe(IObserver<T> observer) {
  // ..add to observer list..
  return observer as IDisposable
}

Вероятнее всего, правильным ответом будет:

return Disposable.Create(() => Observers.Remove(observer));

Однако вы должны отметить, что коллекция, вероятно, будет изменена во время выполнения метода Completed, поэтому создайте копию списка перед их обработкой:

public void Completed()
{
    foreach (var observer in Observers.ToList())
    {
        observer.OnCompleted();
    }
 }
person Toni Kielo    schedule 25.03.2010

Я не уверен, в чем ваша проблема, работает ли это для вас (и чего здесь не хватает, что вы делаете?):

    static void Main(string[] args)
    {
        var rows = new List<int> { 4,5,1,2,5 }.ToObservable();
        rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) =>
            new { Row = row, Index = index })
            .Subscribe(a => ProcessRow(a.Row, a.Index), () => Completed());

        Console.ReadLine();
    }
    static void ProcessRow(int row, int index) {
        Console.WriteLine("Row {0}, Index {1}", row, index);
    }
    static void Completed() {
    }
person DoctorFoo    schedule 28.03.2010
comment
Проблема была с моим созданием отписки IDisposable. Я тупо скопировал/вставил образец откуда-то, что не подходит для моего случая. Как упоминалось выше, я должен был использовать функцию Disposable.Create, но способ, которым я ее использовал, вернул нулевой IDisposable, и поэтому он выдал странное исключение. - person Toni Kielo; 29.03.2010