Превратите TCP-сокет в Observable of Array[Byte]

В моем приложении для Android мне нужно использовать Socket для отправки и получения массивов байтов. Для удобства я хочу работать с Observable, подключенным к Socket.

Глядя в Интернете, я нашел этот код:

import rx.lang.scala.Observable

val s = Observable.using[Char,Socket](new Socket("10.0.2.2", 9002))(
  socket => Observable.from[Char](Source.fromInputStream(socket.getInputStream).toIterable),
  socket => Try(socket.close))
  .subscribeOn(rx.lang.scala.schedulers.IOScheduler.apply)

  val a = s.subscribe(println, println)

Он работает, но выводит по одному символу за раз, например, при отправке строки «привет», вывод:

I/System.out: h
I/System.out: e
I/System.out: l
I/System.out: l
I/System.out: o
I/System.out:  
I/System.out: t
I/System.out: h
I/System.out: e
I/System.out: r
I/System.out: e

Но я хочу получать в свою подписку буферизованные массивы байтов. Как я могу этого добиться?


person src091    schedule 07.01.2017    source источник
comment
Когда вы хотите выпустить Bytes[]? После получения N символов, после получения \n или \r\n или после закрытия сокета? (Или что-то совсем другое?)   -  person Sean Vieira    schedule 07.01.2017
comment
@SeanVieira Я не хочу, чтобы он испускался слишком часто (для каждого полученного байта), я думал о некоторых фрагментах фиксированного размера, которые можно было бы испускать после заполнения.   -  person src091    schedule 07.01.2017
comment
@SeanVieira или, возможно, все отправленное сообщение может быть отправлено в виде однобайтового массива (я не уверен, возможно ли разделить целые сообщения в стиле веб-сокета при использовании простого сокета TCP, в любом случае это будет работать для меня).   -  person src091    schedule 07.01.2017


Ответы (1)


Как уже сказал @SeanVieira, сначала вам нужно решить, как агрегировать элементы потока, символы. Если вы знаете, что поток будет закрыт после каждого сообщения, вы можете дождаться получения всего сообщения, а затем передать последовательность на onCompleted().

Я думаю, что то, что вы реализовали до сих пор, довольно хорошо, так как это зависит от наблюдателя, что и как он хочет обрабатывать символы.

Затем вы можете, в зависимости от ваших потребностей, добавить потоковое преобразование, например. грамм.

Решение с использованием tumblingBuffer в уже созданном Observable может выглядеть так (не тестировалось):

 source.tumblingBuffer(source.filter(_ == '\n'))

где вы буферизуете все, что поступает из источника, и испускаете весь буфер, как только наблюдаемый на границе source.filter(...) испускает элемент. Затем вы можете преобразовать последовательность символов в строку, используя mkString< /a> и подпишитесь на этот Observable:

source.tumblingBuffer(source.filter(_ == '\n')).map(mkString(_))
person wtho    schedule 06.02.2017