Представьте, что у вас есть очередь сообщений со следующим API
class MQ {
public MQ();
// send a single message from your message queue
public void send(string keyPath, string msg);
// Receive a single message from your message queue
public async Task<string> receive(keyPath);
}
Чтобы сделать этот RX совместимым
class MQRX: IObserver<string> {
MQ _mq;
string _keyPath
MQRX(string keyPath){
_mq = mq;
_keyPath = keyPath;
}
IObservable<string> Observe(){
return Observable.Defer(()=> mq.receive(keyPath).ToObservable() ).Repeat();
}
void OnNext(string msg){
_mq.send(msg);
}
void OnError(Exception e){
// The message queue might not
// support serializing exceptions
// or it might or you might build
// a protocol for it.
}
}
Использовать его отказоустойчивым способом. Примечание Повторная подписка будет повторной, если возникнет исключение, вызванное восходящим потоком, доставленным OnError.
new MQRX("users/1/article/2").
Retry().
Subscribe((msg)=>Console.Writeln(msg));
Например, на стороне записи вы можете отправлять сообщение каждые две секунды и повторять подписку на генератор в случае ошибки. Обратите внимание, что в Observable.Interval вряд ли будет ошибка, которая просто генерирует сообщение каждый временной интервал, но представьте себе чтение из файла или какой-либо другой очереди сообщений.
var mq = new MQRX("users/1/article/2");
Observable.Interval(TimeSpan.FromSeconds(2)).
Select((x)=>x.ToString()).
Обратите внимание, что вам, вероятно, следует использовать метод расширения IObservable Catch, а не повторять попытки вслепую, поскольку вы можете снова и снова получать одну и ту же ошибку. Повторить(). Подписаться(мкв);
person
bradgonesurfing
schedule
13.11.2012