Неожиданный сюрприз, у меня есть несколько проблем с Iteratees и обработкой ошибок.
Проблема;
Прочтите несколько байтов из InputStream
(из сети, должно быть InputStream), сделайте несколько фрагментов / группировок в этом InputStream (для распределения работы), а затем преобразование, чтобы превратить это в case class DataBlock(blockNum: Int, data: ByteString)
для отправки акторам (преобразованный массив [байтов] в CompactByteString).
Поток;
InputStream.read -- bytes --> Group -- 1000 byte blocks --> Transform -- DataBlock --> Actors
Код;
class IterateeTest {
val actor: ActorRef = myDataBlockRxActor(...)
val is = new IntputStream(fromBytes...)
val source = Enumerator.fromStream(is)
val chunker = Traversable.takeUpTo[Array[Byte]](1000)
val transform:Iteratee[Array[Byte], Int] = Iteratee.fold[Array[Byte],Int](0) {
(bNum, bytes) => DataBlock(bNum, CompactByteString(bytes)); bNum + 1
}
val fut = source &> chunker |>> transform
}
case class DataBlock(blockNum: Int, data: CompactByteString)
Вопрос;
Мой текущий код Iteratee работает хорошо. Однако я хочу иметь возможность обрабатывать сбои на любой стороне;
- Когда метод InputStream
read
дает сбой - я хочу знать, сколько байтов / блок было успешно обработано, и возобновить чтение потока с этой точки. При чтении в Enumerator выдается ошибка,fut
просто возвращает исключение, состояния нет, поэтому я не знаю, в каком блоке я нахожусь, если я не передаю его субъекту rxing (что я не хочу делать) - Если выходная сторона не работает или больше не может получать сообщения DataBlock, потому что буфер Актера полностью удерживает чтение из входного потока
Как мне это сделать?
Как я мог бы лучше попробовать это, используя итерации reactive-streams / Akka-stream (экспериментальный) или scalaz поверх итераций Play, потому что мне нужна определенная обработка ошибок?
var
- person NightWolf   schedule 17.06.2014