В следующем коде я превращаю сокет TCP в Observable[Array[Byte]]
:
import rx.lang.scala.Observable
import rx.lang.scala.schedulers.IOScheduler
val sock = new Socket
type Bytes = Array[Byte]
lazy val s: Observable[Bytes] = Obs.using[Bytes, Socket] {
sock.connect(new InetSocketAddress("10.0.2.2", 9002), 1000)
sock
}(
socket => Observable.from[Bytes] {
val incoming = socket.getInputStream
val buffer = new Bytes(1024)
Stream.continually {
val read = incoming.read(buffer, 0, 1024)
buffer.take(read)
}.takeWhile(_.nonEmpty)
},
socket => {
println("Socket disposed")
socket.close
s.retry // Does not work
})
.subscribeOn(IOScheduler.apply)
s.subscribe(bytes => println(new String(bytes, "UTF-8")), println)
Соединение с удаленным сервером может быть прервано в любой момент, и в этом случае я бы хотел, чтобы Observable
попытался автоматически переподключиться, но s.retry
ничего не делает. Как я могу этого добиться? Также можно ли это сделать "внутри" текущего Observable
без создания нового и повторной подписки?