Запись элементов в файл по мере их удаления из очереди: Scala fs2 Stream

У меня есть небольшой тест потоков fs2, элементов процесса, ожидания, а затем записи их в файл. Я получаю сообщение об ошибке типа и не могу понять, что это означает:

Ошибка: required: fs2.Stream[[x]cats.effect.IO[x],Unit] => fs2.Stream[[+A]cats.effect.IO[A],Unit], found : [F[_]]fs2.Pipe[F,Byte,Unit] import java.nio.file.Paths

import cats.effect.{Blocker, ExitCode, IO, IOApp, Timer}
import fs2.Stream
import fs2.io
import fs2.concurrent.Queue

import scala.concurrent.duration._
import scala.util.Random

class StreamTypeIntToDouble(q: Queue[IO, Int])(implicit timer: Timer[IO]) {
  import core.Processing._

  val blocker: Blocker =
    Blocker.liftExecutionContext(
      scala.concurrent.ExecutionContext.Implicits.global
    )
  def storeInQueue: Stream[IO, Unit] = {

    Stream(1, 2, 3)
      .covary[IO]
      .evalTap(n => IO.delay(println(s"Pushing $n to Queue")))
      .metered(Random.between(1, 20).seconds)
      .through(q.enqueue)

  }
  def getFromQueue: Stream[IO, Unit] = {
    q.dequeue
      .evalMap(n => IO.delay(println(s"Pulling from queue $n")))
      .through(
        io.file
          .writeAll(Paths.get("file.txt"), blocker)
      )

  }
}

object Five extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    val program = for {
      q <- Queue.bounded[IO, Int](10)
      b = new StreamTypeIntToDouble(q)
      _ <- b.storeInQueue.compile.drain.start
      _ <- b.getFromQueue.compile.drain
    } yield ()
    program.as(ExitCode.Success)
  }
}

person Muna Ar    schedule 18.06.2020    source источник


Ответы (1)


Здесь есть пара проблем, и первая из них наиболее сбивает с толку. writeAll полиморфен в своем контексте F[_], но для него требуется экземпляр ContextShift для F (а также Sync). В настоящее время у вас нет ContextShift[IO] в области видимости, поэтому компилятор не сделает вывод, что F для writeAll должно быть IO. Если вы добавите что-то вроде этого:

implicit val ioContextShift: ContextShift[IO] =
  IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global)

… Тогда компилятор выдаст IO, как и следовало ожидать.

Мое предложение для подобных случаев - пропустить вывод типа. Запись с параметром типа лишь немного более подробна:

  .through(
    io.file
      .writeAll[IO](Paths.get("file.txt"), blocker)
  )

… А это означает, что вы будете получать полезные сообщения об ошибках, например, об отсутствии экземпляров классов типов.

Как только вы исправите эту проблему, появится еще пара. Далее, использование evalMap в этом контексте означает, что у вас будет поток () значений. Если вы измените его на evalTap, побочные эффекты регистрации по-прежнему будут происходить соответствующим образом, но вы не потеряете фактические значения потока, для которого вы его вызываете.

Последняя проблема заключается в том, что writeAll требуется поток байтов, в то время как вы дали ему поток Int. Как вы хотите справиться с этим несоответствием, зависит от предполагаемой семантики, но для примера что-то вроде .map(_.toByte) заставит его скомпилировать.

person Travis Brown    schedule 18.06.2020
comment
спасибо за эти разъяснения. Что касается последней проблемы, что, если у меня будет String вместо Int или, может быть, тип, который я создал, например Event ... Я думаю, что преобразование в байты сработает, не так ли? - person Muna Ar; 18.06.2020
comment
В качестве строки вам понадобится getBytes из Java API. Возможно, вы захотите указать кодировку явно, поскольку, если вы вызовете s.getBytes() без аргументов, вы получите кодировку платформы по умолчанию. - person Travis Brown; 18.06.2020
comment
как насчет явного типа моего творения, такого как Событие, Операция или просто Человек? - person Muna Ar; 18.06.2020
comment
Для некоторого другого типа, который вы определили самостоятельно, вам нужно указать, как вы хотите, чтобы он был сериализован в байты. Есть много библиотек, которые могут помочь в этом (например, Scodec или Circe, если вы можете использовать JSON). Вы также можете использовать сериализацию Java, которая позволяет избежать дополнительных зависимостей, но имеет много неприятных аспектов IMO и не соответствует философии библиотек Scala, таких как fs2. - person Travis Brown; 18.06.2020