Почему ParallelQuery‹T›.Where не работает при преобразовании в Observable?

У меня есть наблюдаемая коллекция, которую я хочу обрабатывать параллельно, затем наблюдать за обработанными значениями при фильтрации и, наконец, подписывать обработчик, который получает отфильтрованные значения.

Мой образец синтаксически верен и компилируется просто отлично, и когда я запускаю код, оценивается оператор Where, выполняющий фильтрацию. Но в подписку не приходят данные. Если я уберу AsParallel, чтобы обработка выполнялась поверх обычного IEnumerable, данные поступают, и все работает как положено.

Вот мой пример, выполняющий некоторую обработку строк:

// Generate some data every second
var strings = Observable.Generate(() =>
    new TimeInterval<Notification<string>>(
        new Notification<string>
            .OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));

// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
                      select "Parallel " + value;

// Filter and observe
var data = String.Empty;
parallelStrings
    .Where(value => !String.IsNullOrEmpty(value))
    .ToObservable()
    .Subscribe(value => data = value);

Следующая странная вещь заключается в том, что если я использую оператор TakeWhile, который, на мой взгляд, концептуально похож на Where, наблюдение за тем, как ParallelQuery работает, как и ожидалось:

// Filter and observe
var data = String.Empty;
parallelStrings
    .TakeWhile(cs => !String.IsNullOrEmpty(cs))
    .ToObservable()
    .Subscribe(value => data = value);

Добавление некоторого кода ведения журнала в подписку показывает, что данные получены до преобразования ToObservable, но не после:

1.    var data = String.Empty;
2.    parallelStrings
3.        .Where(value => !String.IsNullOrEmpty(value))
4.        .Select(value => value)
5.        .ToObservable()
6.        .Select(value => value)
7.        .Subscribe(value => data = value);

Точка останова в лямбде в строке 4 срабатывает, а точка останова в лямбде в строке 6 никогда не срабатывает.

Почему TakeWhile передает данные подписчику, а Where нет?

Если это важно, я разрабатываю свой код в Visual Studio 2010 RC с проектом, ориентированным на клиентский профиль .Net 4.0 Framework.

Обновление: на основе Ответ @Sergeys Я переработал размещение фильтра Where. Следующий код работает, как и ожидалось:

var processedStrings = from value in strings
                       let processedValue = "Parallel " + value
                       where !String.IsNullOrEmpty(processedValue)
                       select processedValue;

var data = String.Empty;
processedStrings
    .ToEnumerable()
    .AsParallel()
    .ToObservable()
    .Subscribe(value => data = value );

Все еще кажется немного неудобным сначала преобразовать начальный наблюдаемый processedStrings в перечисляемый, чтобы распараллелить его, а затем преобразовать его обратно в наблюдаемый, чтобы подписаться на окончательный результат.


person Peter Lillevold    schedule 18.02.2010    source источник
comment
Другое исправление может заключаться в использовании .AsOrdered непосредственно перед .ToObservable - не пробовал.   -  person Sergey Aldoukhov    schedule 19.02.2010
comment
.AsParallel().Where(...).AsOrdered().ToObservable()... завершается с ошибкой: System.InvalidOperationException: AsOrdered may only be called on the result of AsParallel, ParallelEnumerable.Range, or ParallelEnumerable.Repeat. Кажется, что для фильтрации после распараллеливания единственным вариантом является использование .TakeWhile .   -  person Peter Lillevold    schedule 19.02.2010


Ответы (2)


Из C# 4.0 в двух словах:


В настоящее время существуют некоторые практические ограничения на то, что PLINQ может распараллеливать. Эти ограничения могут быть ослаблены с последующими пакетами обновлений и версиями Framework. Следующие операторы запроса предотвращают распараллеливание запроса, если исходные элементы не находятся в исходной позиции индексации:

  • Взять, взять во время, пропустить и пропустить во время
  • Индексированные версии Select, SelectMany и ElementAt

Большинство операторов запросов изменяют позицию индексации элементов (включая те, которые удаляют элементы, такие как Where). Это означает, что если вы хотите использовать предыдущие операторы, они обычно должны быть в начале запроса.


Таким образом, использование TakeWhile предотвращает распараллеливание .AsParallel(). Трудно сказать, почему Where убивает подписку, но размещение перед AsParallel может решить проблему.

person Sergey Aldoukhov    schedule 18.02.2010
comment
Вы имеете в виду, что .TakeWhile, используемый до .AsParallel, предотвратит распараллеливание? Я немного смущен тем, как это объясняет то, что я вижу. В моем образце, насколько я могу судить, ни TakeWhile, ни Where не нарушают распараллеливающую часть (данные всегда достигают лямбды Where). С другой стороны, .Where каким-то образом предотвращает дальнейшую передачу данных наблюдаемым. - person Peter Lillevold; 18.02.2010
comment
Нет, наоборот, использование TakeWhile перед AsParallel не помешало бы распараллеливанию. Виной всему то, что AsObservable почему-то (не знаю почему) не работает с параллельными данными, поступающими в пайплайн. - person Sergey Aldoukhov; 18.02.2010

TakeWhile концептуально не эквивалентно Where, потому что это зависит от порядка. Я подозреваю, что запрос фактически выполняется последовательно (см. это сообщение в блоге). Попробуйте вызвать .WithExecutionMode(ParallelExecutionMode.ForceParallelism) в своем примере TakeWhile, и я подозреваю, что вы увидите тот же результат.

Я не знаю, почему это не работает в параллельном случае... могу ли я предложить вам внести некоторые записи в журнал, чтобы увидеть, как далеко распространяются данные? Вы можете выполнить полезную регистрацию с помощью Select, который возвращает исходный элемент, например, после его регистрации.

person Jon Skeet    schedule 18.02.2010
comment
Хм, ну, если бы он выполнялся последовательно, возможно, он сработал. Именно введение AsParallel() останавливает обработку. Добавление WithExecutionMode не меняет поведение: данные по-прежнему не проходят при фильтрации с помощью Where. Я попытаюсь добавить еще несколько журналов, но точка останова в лямбде Where показывает, что данные по крайней мере достигают оператора Where... - person Peter Lillevold; 18.02.2010
comment
@Peter: Понижение было одним из серии понижений примерно в одно и то же время, так что, вероятно, просто кто-то затаил обиду. Идея WithExecutionMode(...) заключалась в том, чтобы показать, что с TakeWhile также не получается, а не заставить работать Where. Возможно, вы захотите поместить операцию регистрации непосредственно перед ToObservable(), чтобы увидеть, что на самом деле получает наблюдаемый объект. Кроме того, вероятно, стоит изучить, есть ли какой-то способ распараллелить саму наблюдаемую — кажется странным прыгать между двумя моделями вот так. - person Jon Skeet; 18.02.2010
comment
Добавлено логирование, которое только подтверждает факт: данные принимаются вплоть до окончательного преобразования в observable. - person Peter Lillevold; 18.02.2010