Объединить большое количество наблюдаемых в новые наблюдаемые

У меня есть, скажем, 1000 наблюдаемых. Теперь я хочу объединить все события в новую наблюдаемую, которая запускает OnNext, как только все остальные отправят событие. Как лучше всего это сделать с помощью Rx?

Обновление: несколько отличных отзывов на форуме Rx, особенно от Дейва Секстона. Он показал, как создать метод расширения Zip, который принимает несколько наблюдаемых: /" rel="nofollow">http://social.msdn.microsoft.com/Forums/en-US/rx/thread/daaa84db-b560-4eda-871e-e523098db20c/


person lukebuehler    schedule 03.12.2010    source источник
comment
Все ли типы 1000 наблюдаемых одинаковы? Что вы представляете для наблюдаемого агрегирования?   -  person DoctorFoo    schedule 03.12.2010
comment
Все 1000 наблюдаемых относятся к одному типу, новый агрегат может быть нового типа. Например. Событие становится AggregateEvent.   -  person lukebuehler    schedule 03.12.2010
comment
Вы хотите объединить только их последние значения? т.е. если Observable a запускает два события, а Observable b запускает только одно, хотите ли вы агрегировать первое событие из a или последнее событие из a с событием b?   -  person DoctorFoo    schedule 03.12.2010
comment
@Richard Hein Давайте начнем с того, что возьмем последнее событие из a, поскольку мы можем предположить, что они поступают по порядку, то есть все 1000 событий поступают до того, как некоторые наблюдаемые запустят новые события. НО, конечно, было бы гораздо лучше, если бы они были соединены так, чтобы первое событие a прибывало с первым событием b.   -  person lukebuehler    schedule 03.12.2010
comment
Я думаю, вам лучше задать этот вопрос на форумах Rx.   -  person DoctorFoo    schedule 04.12.2010
comment
Вам нужен вывод после каждого значения, выдаваемого из исходных наблюдаемых, или сразу после первых значений?   -  person Richard Szalay    schedule 04.12.2010


Ответы (1)


В F# есть MailboxProcessor... Я бы использовал SynchronizationContext в C# для той же цели. Дайте мне несколько минут, и я напишу пример.

В сторону: вот мой код на F #, который делает что-то подобное... Это потребует значительно больше усилий, но все же выполнимо на C # с Rx.

open System.Diagnostics

let numWorkers = 20
let asyncDelay = 100

type MessageForMailbox =
   | DataMessage of AsyncReplyChannel<unit>
   | GetSummary of AsyncReplyChannel<unit>

let main =
   let actor =
      MailboxProcessor.Start( fun inbox ->
         let rec loop acc =
            async {
               let! message = inbox.Receive()
               match message with
               | DataMessage replyChannel -> replyChannel.Reply(); return! loop acc
               | GetSummary replyChannel -> replyChannel.Reply(); return! loop acc
            }

         loop 0 // seed for acc
      )

   let codeBlocks = [for i in 1..numWorkers -> 
                        async {
                           do! Async.Sleep asyncDelay
                           return! actor.PostAndAsyncReply DataMessage
                        } ]

   while true do
      printfn "Concurrent started..."
      let sw = new Stopwatch()
      sw.Start()
      codeBlocks |> Async.Parallel |> Async.RunSynchronously |> ignore
      actor.PostAndReply GetSummary
      sw.Stop()
      printfn "Concurrent in %d millisec" sw.ElapsedMilliseconds
      printfn "efficiency: %d%%" (int64 (asyncDelay * 100) / sw.ElapsedMilliseconds)

      printfn "Synchronous started..."
      let sw = new Stopwatch()
      sw.Start()
      for codeBlock in codeBlocks do codeBlock |> Async.RunSynchronously |> ignore
      sw.Stop()
      printfn "Synchronous in %d millisec" sw.ElapsedMilliseconds
      printfn "efficiency: %d%%" (int64 (asyncDelay * numWorkers * 100) / sw.ElapsedMilliseconds)

main
person GregC    schedule 03.12.2010
comment
хм, так вы имеете в виду что-то вроде использования SynchronizationContext.Send() для синхронизации всех наблюдаемых, создающих события? Я вроде как вижу, что делает ваш код F #, но я недостаточно сообразителен, чтобы понять его полностью. - person lukebuehler; 03.12.2010
comment
Я думаю, ты понял. RunSynchronously реализует ForkJoin с асинхронными рабочими процессами. - person GregC; 04.12.2010
comment
+1: я никогда раньше не видел хорошего примера MailboxProcessor. :) - person Tuomas Hietanen; 20.01.2011