Пытаюсь реализовать приложение, управляющее камерой. Команды камеры представлены в виде потока объектов CameraAction:
sealed trait CameraMessage
case object Record(recordId: String) extends CameraMessage
case object Stop extends CameraMessage
...
val s = Stream[F, CameraMessage]
Допустим, у меня есть тестовый поток, который испускает «Запись» и выдает «Стоп» через 20 секунд, еще через 20 секунд выдается другое сообщение «Запись» и так далее, входной поток бесконечен.
Затем приложение потребляет «Запись», оно должно создать экземпляр конвейера GStreamer (т.е. это эффект) и «запустить» его, при «Стоп» оно «останавливает» конвейер и закрывает его. Затем при последующей «записи» шаблон повторяется с новым конвейером GStreamer.
Проблема в том, что мне нужно передать экземпляр нечистого изменяемого объекта между дескрипторами потоковых событий.
В документации по FS2 предлагается использовать фрагменты для сохранения состояния потока, поэтому я попробовал
def record(gStreamerPipeline: String, fileName: String)
(implicit sync: Sync[F]): F[Pipeline] =
{
... create and open pipeline ...
}
def stopRecording(pipe: Pipeline)(implicit sync: Sync[F]): F[Unit] = {
... stop pipeline, release resources ...
}
def effectPipe(pipelineDef: String)(implicit L: Logger[F]):
Pipe[F, CameraMessage, F[Unit]] = {
type CameraSessionHandle = Pipeline
type CameraStream = Stream[F, CameraSessionHandle]
s: Stream[F, CameraMessage] =>
s.scanChunks(Stream[F, CameraSessionHandle]()) {
case (s: CameraStream, c: Chunk[CameraMessage]) =>
c.last match {
case Some(Record(fileName)) =>
(Stream.bracket(record(pipelineDef, fileName))(p => stopRecording(p)), Chunk.empty)
case Some(StopRecording) =>
(Stream.empty, Chunk(s.compile.drain))
case _ =>
(s, Chunk.empty)
}
}
}
Проблема с этим кодом в том, что фактическая запись не происходит при событии «Запись», а оценивается эффект всего фрагмента, т.е. когда приходит сообщение «StopRecording», камера включается, а затем сразу же выключается.
Как я могу передать «состояние» без фрагментов? Или есть другой способ добиться нужного мне результата?
Это может быть похоже на поток FS2 с StateT [IO, _, _], периодически dumping state, но разница в том, что в моем случае состояние - это не чистая структура данных, а ресурс.