У меня есть наблюдаемая коллекция, которую я хочу обрабатывать параллельно, затем наблюдать за обработанными значениями при фильтрации и, наконец, подписывать обработчик, который получает отфильтрованные значения.
Мой образец синтаксически верен и компилируется просто отлично, и когда я запускаю код, оценивается оператор Where
, выполняющий фильтрацию. Но в подписку не приходят данные. Если я уберу AsParallel
, чтобы обработка выполнялась поверх обычного IEnumerable
, данные поступают, и все работает как положено.
Вот мой пример, выполняющий некоторую обработку строк:
// Generate some data every second
var strings = Observable.Generate(() =>
new TimeInterval<Notification<string>>(
new Notification<string>
.OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));
// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
select "Parallel " + value;
// Filter and observe
var data = String.Empty;
parallelStrings
.Where(value => !String.IsNullOrEmpty(value))
.ToObservable()
.Subscribe(value => data = value);
Следующая странная вещь заключается в том, что если я использую оператор TakeWhile
, который, на мой взгляд, концептуально похож на Where, наблюдение за тем, как ParallelQuery работает, как и ожидалось:
// Filter and observe
var data = String.Empty;
parallelStrings
.TakeWhile(cs => !String.IsNullOrEmpty(cs))
.ToObservable()
.Subscribe(value => data = value);
Добавление некоторого кода ведения журнала в подписку показывает, что данные получены до преобразования ToObservable
, но не после:
1. var data = String.Empty;
2. parallelStrings
3. .Where(value => !String.IsNullOrEmpty(value))
4. .Select(value => value)
5. .ToObservable()
6. .Select(value => value)
7. .Subscribe(value => data = value);
Точка останова в лямбде в строке 4 срабатывает, а точка останова в лямбде в строке 6 никогда не срабатывает.
Почему TakeWhile
передает данные подписчику, а Where
нет?
Если это важно, я разрабатываю свой код в Visual Studio 2010 RC с проектом, ориентированным на клиентский профиль .Net 4.0 Framework.
Обновление: на основе Ответ @Sergeys Я переработал размещение фильтра Where
. Следующий код работает, как и ожидалось:
var processedStrings = from value in strings
let processedValue = "Parallel " + value
where !String.IsNullOrEmpty(processedValue)
select processedValue;
var data = String.Empty;
processedStrings
.ToEnumerable()
.AsParallel()
.ToObservable()
.Subscribe(value => data = value );
Все еще кажется немного неудобным сначала преобразовать начальный наблюдаемый processedStrings
в перечисляемый, чтобы распараллелить его, а затем преобразовать его обратно в наблюдаемый, чтобы подписаться на окончательный результат.
System.InvalidOperationException: AsOrdered may only be called on the result of AsParallel, ParallelEnumerable.Range, or ParallelEnumerable.Repeat.
Кажется, что для фильтрации после распараллеливания единственным вариантом является использование .TakeWhile . - person Peter Lillevold   schedule 19.02.2010