Rx.Net: вызов нескольких IObservable в SelectMany

Обратите внимание: это продолжение вопроса . выложили ранее но интересующее решение другой ситуации.

Я пытаюсь сделать несколько вызовов методов, каждый из которых возвращает IObservable, но значения, возвращаемые обратно в операторе SelectMany, являются задачей, и, следовательно, следующий оператор Subscribe не компилируется.

Это фрагмент кода

 var myWorkList = new List<MyWork>
                {
                    new MyWork(),// MyWork.Execute(data) returns IObservable
                    new MyWork()
                }.ToObservable();

 var results =
   myService
    .GetData(accountId)
    .SelectMany(data => myWorkList.ForEachAsync(r => r.Execute(data))
    .Subscribe(result =>
    {
        Console.WriteLine($"Result Id: {result.Id}");
        Console.WriteLine($"Result Status: {result.Pass}");
    });

person Gautam T Goudar    schedule 06.05.2018    source источник


Ответы (2)


Вы просто хотите использовать .SelectMany. Попробуй это:

var myWorkList = new List<MyWork>()
{
    new MyWork(),
    new MyWork()
}.ToObservable();

var query =
    from data in myService.GetData(accountId)
    from myWork in myWorkList
    from result in myWork.Execute(data)
    select result;

var results =
    query
        .Subscribe(result =>
        {
            Console.WriteLine($"Result Id: {result.Id}");
            Console.WriteLine($"Result Status: {result.Pass}");
        });

Вот мой тестовый код:

public static class myService
{
    public static IObservable<MyData> GetData(int x)
        => Observable.Return(new MyData());
}

public class MyWork
{
    public virtual IObservable<MyResult> Execute(MyData data)
    {
        return
            from isMatch in IsMatch(data)
            where isMatch
            select new MyResult() { Id = 1, Pass = true };
    }

    public IObservable<bool> IsMatch(MyData data)
    {
        return Observable.Return(true);
    }
}

public class MyResult
{
    public int Id;
    public bool Pass;
}

public class MyData { }

Когда я выполняю, я получаю это:

Result Id: 1
Result Status: True
Result Id: 1
Result Status: True

В комментариях к вашему предыдущему вопросу я предложил сделать это в виде списка делегатов. Вот как:

var myWorkList = new Func<MyData, IObservable<MyResult>>[]
{
    md => new MyWork().Execute(md),
    md => new MyWork().Execute(md),
}.ToObservable();

var query =
    from data in myService.GetData(accountId)
    from myWork in myWorkList
    from result in myWork(data)
    select result;

Вы получите тот же результат.

person Enigmativity    schedule 06.05.2018
comment
Спасибо, и это тоже работает. Спасибо, что также предоставили альтернативу. - person Gautam T Goudar; 06.05.2018

Список может быть объявлен так же, как список MyWork, без использования ToObservable здесь.

var myWorkList = new List<MyWork>
            {
                new MyWork(),// MyWork.Execute(data) returns IObservable
                new MyWork()
            };

затем мы отображаем объекты, возвращенные myService.GetData, на элементы myWorkList и принимаем их как IObservable-s.

var observables = myService
         .GetData(accountId)
         .SelectMany(data => myWorkList.Select(r => r.Execute(data)));

И теперь вы можете наблюдать за ними.

Либо вместе - слились:

var subscription =
          observables
           .Merge()
           .Subscribe(result =>
           {
               ...
           });

Или отдельно:

var subscriptions=
          observables
           .Select(obs => 
               obs.Subscribe(result =>
               {
                   ...
               }))
           .ToArray();

Обновление: последний случай должен быть немедленно реализован, чтобы предотвратить побочные эффекты (.ToArray()).

person armenm    schedule 06.05.2018
comment
Вариант «Или отдельно» ужасен. Не делайте этого .Subscribe в .Select жизни. Это ужасно. Если бы вы написали строку .SelectMany(data => myWorkList.Select(r => r.Execute(data))); как .SelectMany(data => myWorkList.SelectMany(r => r.Execute(data)));, вы избежали бы необходимости в .Merge или каких-либо причудливых внутренних подписках. - person Enigmativity; 07.05.2018
comment
@Enigmativity, я обновил свой ответ, чтобы сделать его более безопасным. В случае слияния у вас есть возможность иметь только одну подписку. Во многих случаях это намного полезнее, чем иметь много отдельных. - person armenm; 07.05.2018
comment
Во многих случаях? В любом случае. Вложенные подписки ужасны. И что вы имеете в виду под последним случаем, которое должно быть немедленно реализовано во избежание побочных эффектов? - person Enigmativity; 07.05.2018
comment
В этом нет ничего плохого - то, как я написал, будет работать идеально. У меня есть IEnumerable из IObservable-ов, которые я конвертирую в массив подписок (IDisposables). Пожалуйста, если вы используете такие слова, как ужасный, будьте так любезны объяснить, что именно вы имеете в виду. Во-первых, семантически, а во-вторых, синтаксически. - person armenm; 07.05.2018
comment
@Enigmativity, .SelectMany(data => myWorkList.SelectMany(r => r.Execute(data))) не работает, поскольку компилятор не может вывести типы для SelectMany. Но я согласен с тем, что SelectMany работает так же, как вызовы Select и Merge. - person Gautam T Goudar; 07.05.2018
comment
@GautamTGoudar - я только что протестировал .SelectMany(data => myWorkList.SelectMany(r => r.Execute(data))), и он отлично работает. - person Enigmativity; 08.05.2018
comment
@Загадочность. Да, это работает. Для этого мне пришлось сделать myWorkList наблюдаемым. var myWorkList = новый список‹MyWork› { new MyWork(), new MyWork() }.ToObservable(); - person Gautam T Goudar; 08.05.2018