Как сериализовать Observables в облако и обратно

Мне нужно разделить последовательность обработки (как в этом вопросе Как организовать последовательность процессоров данных с .net RX) на несколько вычислений единиц в среде Azure.
Идея состоит в том, чтобы сериализовать последовательность Observable в очереди Azure (или служебную шину) и десериализовать ее обратно.
Если производитель или потребитель терпит неудачу, другая сторона должна иметь возможность продолжать производство/потребление.

Может ли кто-нибудь предложить элегантный способ сделать это и что использовать (Azure Queues или Service Bus)?
Кто-нибудь использовал поставщика TCP Observable - http://rxx.codeplex.com/wikipage?title=TCP%20Qbservable%20Provider для таких проблем безопасен ли отказ одной из сторон ?


person arena-ru    schedule 09.11.2012    source источник
comment
Можете ли вы переименовать этот вопрос? Похоже, вы спрашиваете не о том, как сериализовать наблюдаемую последовательность (что наводит меня на мысль, что вы хотите сериализовать запрос --> IQbservable), а о значениях наблюдаемой последовательности. Слишком подвесной? Кстати, я думаю, что чтение/запись из Queues в любом случае лучше подходит для IEnumerable.   -  person Lee Campbell    schedule 14.11.2012


Ответы (2)


Представьте, что у вас есть очередь сообщений со следующим API

class MQ {

    public MQ();

    // send a single message from your message queue
    public void send(string keyPath, string msg);

    // Receive a single message from your message queue
    public async Task<string> receive(keyPath);

}

Чтобы сделать этот RX совместимым

class MQRX: IObserver<string> {
    MQ _mq;
    string _keyPath

    MQRX(string keyPath){
        _mq = mq;
        _keyPath = keyPath;
    }

    IObservable<string> Observe(){
        return Observable.Defer(()=> mq.receive(keyPath).ToObservable() ).Repeat();
    }

    void OnNext(string msg){
        _mq.send(msg);
    }

    void OnError(Exception e){
        // The message queue might not
        // support serializing exceptions
        // or it might or you might build
        // a protocol for it.
    }
}

Использовать его отказоустойчивым способом. Примечание Повторная подписка будет повторной, если возникнет исключение, вызванное восходящим потоком, доставленным OnError.

new MQRX("users/1/article/2").
    Retry().
    Subscribe((msg)=>Console.Writeln(msg));

Например, на стороне записи вы можете отправлять сообщение каждые две секунды и повторять подписку на генератор в случае ошибки. Обратите внимание, что в Observable.Interval вряд ли будет ошибка, которая просто генерирует сообщение каждый временной интервал, но представьте себе чтение из файла или какой-либо другой очереди сообщений.

var mq = new MQRX("users/1/article/2");

Observable.Interval(TimeSpan.FromSeconds(2)).
    Select((x)=>x.ToString()).

Обратите внимание, что вам, вероятно, следует использовать метод расширения IObservable Catch, а не повторять попытки вслепую, поскольку вы можете снова и снова получать одну и ту же ошибку. Повторить(). Подписаться(мкв);

person bradgonesurfing    schedule 13.11.2012

Я написал свою собственную оболочку UDP для RX в 30 строк кода VB, и она обрабатывает тайм-ауты. Обертки TCP будут похожими, я думаю.

Imports System.Reactive
Imports System.Reactive.Linq
Imports System.Reactive.Threading.Tasks
Imports System.Threading
Imports System.Net
Imports System.Net.Sockets

Public Module UDP
    ''' <summary>
    ''' Generates an IObservable as a UDP stream on the IP endpoint with an optional
    ''' timeout value between results.
    ''' </summary>
    ''' <param name="timeout"></param>
    ''' <returns></returns>
    ''' <remarks></remarks>
    Public Function StreamObserver(
                localPort As Integer, 
                Optional timeout As Nullable(Of TimeSpan) = Nothing
                ) As IObservable(Of UdpReceiveResult)

        Return Linq.Observable.Using(Of UdpReceiveResult, UdpClient)(
            Function() New UdpClient(localPort),
            Function(udpClient)
                Dim o = Observable.Defer(Function() udpClient.
                                                                ReceiveAsync().
                                                                ToObservable())
                If Not timeout Is Nothing Then
                    Return Observable.Timeout(o.Repeat(), timeout.Value)
                Else
                    Return o.Repeat()
                End If
            End Function
        )
    End Function
End Module

Вы можете сделать то же самое для стороны записи, если вам нужно. Затем вы просто используете обычные методы RX для сериализации ваших данных в кадры UDP.

new UDP.StreamObserver(8080, TimeSpan.FromSeconds(2)).
        Select( function(udpResult) MyDeserializeFunction(udpResult)).
        Subscribe( sub (result) DoSomething(result), 
                   sub(error) DoSomethingWithError ) 
person bradgonesurfing    schedule 12.11.2012
comment
Как насчет того, если производитель или потребитель потерпит неудачу, другая сторона должна иметь возможность продолжать производить/потреблять часть? - person arena-ru; 13.11.2012
comment
Вы хотите взглянуть на метод расширения Catch для IObservable, который позволяет возобновить работу Observable, завершившейся сбоем. Однако вы должны спросить себя, является ли промежуточный MessageBus постоянной очередью? Например, если это UDP, и производитель теряет соединение, а затем возобновляет его, он потеряет пакеты, если производитель тем временем передал их. Если вы используете постоянную очередь сообщений, то в зависимости от вашей настройки вы можете восстановиться после ошибок в потребителе/производителе и не потерять сообщения. - person bradgonesurfing; 13.11.2012
comment
Пример с постоянной очередью сообщений - это то, что я ищу. Однако примеры UDP и Tcp хороши для начала. - person arena-ru; 13.11.2012
comment
Я добавил новый ответ с воображаемым API очереди сообщений. Замените на ваш любимый API очереди сообщений - person bradgonesurfing; 13.11.2012