Уведомление асинхронного субъекта от другого наблюдаемого с использованием Rx .NET

Я пытаюсь уведомить слушателей, которые подписались на Subject _sub из другого наблюдаемого, и после этого записать какое-то сообщение в обработчик Do. Я вызываю OnNext, и все бы работало нормально, если бы _sub не было асинхронным. Проблема здесь в том, что нет функции OnNextAsync, которую я бы ждал в первом наблюдаемом. Как лучше всего это сделать?

 class Program
        {
            private static Subject<int> _sub = new Subject<int>();

            static void Main(string[] args)
            {
                _sub.SelectMany(async _ =>
                {
                    Console.WriteLine("SUB START: " + _);
                    await Task.Delay(3000);
                    Console.WriteLine("SUB END: " + _);
                    return 1;
                }).Subscribe();


                Start();
            }

            public static void Start()
            {
                int count = 0;
                Observable.Interval(TimeSpan.FromSeconds(5)).Select(x =>
                {
                    Console.WriteLine("START INTERVAL");
                    _sub.OnNext(count++); //onNext is not awaitable
                    Console.WriteLine("END INTERVAL");
                    return 1;
                })
                .Do(_ => Console.WriteLine("ALL FINISHED"))
                .Subscribe();

                Console.WriteLine("READLINE");
                Console.ReadLine();
            }

        }

Результат:

READLINE
START INTERVAL
SUB START: 0
END INTERVAL
ALL FINISHED
SUB END: 0

Ожидаемый результат:

READLINE
START INTERVAL
SUB START: 0
SUB END: 0
END INTERVAL
ALL FINISHED

person MistyK    schedule 09.10.2018    source источник
comment
Наблюдаемое не должно полагаться на поведение своих наблюдателей.   -  person Paulo Morgado    schedule 10.10.2018
comment
@PauloMorgado, что ты предлагаешь?   -  person MistyK    schedule 10.10.2018
comment
Вас может заинтересовать метод ForEachAsync для наблюдаемых, который выполняет асинхронное действие для каждого значения последовательности и может ожидать завершения самой последовательности и всех асинхронных действий. Вы можете найти реализацию здесь.   -  person Theodor Zoulias    schedule 29.11.2020


Ответы (2)


Наблюдаемое не должно полагаться на поведение своих наблюдателей.

Я предлагаю вам переосмыслить все это. То, что вы делаете, выглядит более интерактивным, чем реактивным.

person Paulo Morgado    schedule 10.10.2018
comment
Я не согласен. Это всего лишь простой сценарий, в котором я хочу регистрировать сообщение до и после того, как я уведомил своих наблюдателей. Единственное отличие состоит в том, что наблюдатель может быть асинхронным. В любом случае это должен быть комментарий, а не ответ. - person MistyK; 10.10.2018
comment
(Это должен был быть комментарий, но я все еще думаю, что это ответ) - person Paulo Morgado; 10.10.2018
comment
Но вы записываетесь до и после того, как уведомите своих наблюдателей. Просто наблюдатель начал что-то делать после того, как был уведомлен об этом. - person Paulo Morgado; 10.10.2018

Я повторю Пауло для ясности:

  • Наблюдаемые объекты не должны заботиться об наблюдателях. В то время как Observable будет ждать своих синхронных Observers, лучше всего думать об этом как о несчастном случае реализации. Наблюдаемый вообще не ожидает асинхронного наблюдателя. В любом случае, на ожидание не стоит полагаться.

  • Вам действительно следует переосмыслить то, как вы это делаете. Вы используете реактивную библиотеку для написания интерактивного кода. Вероятно, либо неправильный выбор инструмента, либо неправильное использование инструмента.

  • Ваш код замусорен Rx code-smells. Думайте о реактивном коде как о блок-схеме. Блок-схема вашего кода будет выглядеть как спагетти. Это должно больше походить на бинарное дерево.

Это похоже на проблему XY: я бы предложил перефразировать ваш вопрос, указав, чего вы пытаетесь достичь.

person Shlomo    schedule 11.10.2018
comment
Это не проблема XY. У меня есть реальный сценарий, вышеприведенная версия просто упрощена. Почему вы говорите, что это интерактивный код? Вся идея заключается в том, что у меня есть несколько наблюдаемых (например, данные изменились), которые могут уведомить вторую наблюдаемую (например, для каждого изменения данных проверьте, может ли процесс начаться) - person MistyK; 11.10.2018