Обработка ошибок / исключений Iteratees против реактивных потоков / akka-stream

Неожиданный сюрприз, у меня есть несколько проблем с Iteratees и обработкой ошибок.

Проблема;

Прочтите несколько байтов из InputStream (из сети, должно быть InputStream), сделайте несколько фрагментов / группировок в этом InputStream (для распределения работы), а затем преобразование, чтобы превратить это в case class DataBlock(blockNum: Int, data: ByteString) для отправки акторам (преобразованный массив [байтов] в CompactByteString).

Поток;

InputStream.read -- bytes --> Group -- 1000 byte blocks --> Transform -- DataBlock --> Actors

Код;

class IterateeTest {
  val actor: ActorRef = myDataBlockRxActor(...)
  val is = new IntputStream(fromBytes...)
  val source = Enumerator.fromStream(is)
  val chunker = Traversable.takeUpTo[Array[Byte]](1000)
  val transform:Iteratee[Array[Byte], Int] = Iteratee.fold[Array[Byte],Int](0) {
    (bNum, bytes) => DataBlock(bNum, CompactByteString(bytes)); bNum + 1
  }
  val fut = source &> chunker |>> transform
}

case class DataBlock(blockNum: Int, data: CompactByteString)

Вопрос;

Мой текущий код Iteratee работает хорошо. Однако я хочу иметь возможность обрабатывать сбои на любой стороне;

  1. Когда метод InputStream read дает сбой - я хочу знать, сколько байтов / блок было успешно обработано, и возобновить чтение потока с этой точки. При чтении в Enumerator выдается ошибка, fut просто возвращает исключение, состояния нет, поэтому я не знаю, в каком блоке я нахожусь, если я не передаю его субъекту rxing (что я не хочу делать)
  2. Если выходная сторона не работает или больше не может получать сообщения DataBlock, потому что буфер Актера полностью удерживает чтение из входного потока

Как мне это сделать?

Как я мог бы лучше попробовать это, используя итерации reactive-streams / Akka-stream (экспериментальный) или scalaz поверх итераций Play, потому что мне нужна определенная обработка ошибок?


person NightWolf    schedule 16.06.2014    source источник
comment
Есть ли способ сделать №1 без использования общего var   -  person NightWolf    schedule 17.06.2014


Ответы (1)


(1) может быть реализовано с Enumeratee.

val (counter, getCount) = {
  var count = 0
  (Enumeratee.map { x => count += 1; x },
   () => count)
}
val fut = source &> counter &> chunker |>> transform

Затем вы можете использовать getCount внутри recover или чего-то подобного на fut.

Вы получаете (2) бесплатно с Play Iteratees. Никаких дальнейших чтений не произойдет, пока Iteratee не будет готов к дополнительным данным, а в случае сбоя больше не будет чтения. InputStream автоматически закрывается, когда Enumerator завершается из-за сбоя или нормального завершения.

person wingedsubmariner    schedule 16.06.2014
comment
Спасибо, но этот код не работает. Вы не можете + = на _1 _... Я не думал, что было безопасно сделать count a var - person NightWolf; 17.06.2014
comment
Ой, я хотел написать там var. Отредактировано. var здесь безопасно, только один поток будет в теле Enumeratee одновременно. - person wingedsubmariner; 17.06.2014
comment
Спасибо! Вары также безопасны для использования из фьючерсов ... Теоретически я мог бы сделать count += 1 вне кода Enumeratee, который не кажется `` безопасным '' - person NightWolf; 17.06.2014
comment
Выполнение count += 1 вне Enumeratee во время его работы определенно небезопасно. На самом деле я использую здесь только var, потому что это самый простой способ реализовать это, но вы должны быть осторожны с тем, как вы используете var. Нет особых причин использовать var внутри future, было бы лучше включить все в возвращаемое значение future. - person wingedsubmariner; 17.06.2014
comment
К вашему сведению - Enumerator.fromStream включает в себя призыв закрыть, если вы посмотрите на источник - person NightWolf; 17.06.2014
comment
Есть ли другой способ реализовать это без общего var count? - person NightWolf; 17.06.2014
comment
Хорошее замечание о fromStream, я отредактировал его. Также я изменил код, чтобы инкапсулировать var count. - person wingedsubmariner; 17.06.2014