Перезапустите Observable, подключенный к ресурсу.

В следующем коде я превращаю сокет TCP в Observable[Array[Byte]]:

import rx.lang.scala.Observable
import rx.lang.scala.schedulers.IOScheduler

val sock = new Socket
type Bytes = Array[Byte]

lazy val s: Observable[Bytes] = Obs.using[Bytes, Socket] {
  sock.connect(new InetSocketAddress("10.0.2.2", 9002), 1000)
  sock
}(
  socket => Observable.from[Bytes] {

    val incoming = socket.getInputStream
    val buffer = new Bytes(1024)

    Stream.continually {
      val read = incoming.read(buffer, 0, 1024)
      buffer.take(read)
    }.takeWhile(_.nonEmpty)

  },

  socket => {
    println("Socket disposed")
    socket.close
    s.retry // Does not work
  })
  .subscribeOn(IOScheduler.apply)

s.subscribe(bytes => println(new String(bytes, "UTF-8")), println)

Соединение с удаленным сервером может быть прервано в любой момент, и в этом случае я бы хотел, чтобы Observable попытался автоматически переподключиться, но s.retry ничего не делает. Как я могу этого добиться? Также можно ли это сделать "внутри" текущего Observable без создания нового и повторной подписки?


person src091    schedule 08.01.2017    source источник


Ответы (1)


Вы хотите настроить новое подключение к сокету для каждой новой подписки. Проще всего это сделать с (A)SyncOnSubscribe, портированным на RxScala, начиная с версии 0.26.5. Если у вас есть это наблюдаемое, вы можете использовать обычные методы контроля ошибок, такие как .retry.

Что-то вроде этого:

val socketObservable: Observable[Byte] = Observable.create(SyncOnSubscribe.singleState(
  generator = () =>
    sock
      .connect(new InetSocketAddress("10.0.2.2", 9002), 1000)
      .getInputStream
)(next = is => Try(is.read()) match {
    case Success(-1) => Notification.OnCompleted()
    case Success(byte) => Notification.OnNext(byte)
    case Failure(e) => Notification.OnError(e)
  },
  onUnsubscribe = is => Try(is.close)
)

Примечание: это считывает один байт за раз и не очень эффективно. Вы можете улучшить это с помощью ASyncOnSubscribe или сделать так, чтобы каждое событие вашего наблюдаемого объекта представляло собой массив байтов.

Примечание: это холодный наблюдаемый объект, который создаст новый сокет для каждого подписчика. Например, это откроет 2 сокета:

socketObservable.foreach(b => System.out.print(b))
socketObservable.buffer(1024).foreach(kiloByte => System.out.println(kiloByte))

Если это не то, что вы хотите, вы можете превратить это в горячее с .share

person dtech    schedule 17.01.2017