Иногда нам нужно, чтобы ресурс был доступен до тех пор, пока происходит обработка некоторого потока FS2. Когда мы говорим о ресурсе, мы имеем в виду что-то, что имеет жизненный цикл, поэтому его необходимо приобретать и освобождать. Библиотека Cats Effect предлагает абстракции для этих шаблонов. Раз так, мы можем представить себе, что должен существовать простой способ компоновки ресурса с потоком, чтобы он был доступен до тех пор, пока мы выполняем обработку потока, и мы освобождаем его, когда закончим с ним. Хотя это может быть не так просто. Давайте посмотрим, как мы можем решить эту проблему.

Это пример, с которым мы будем работать:

import cats.effect.{ExitCode, IO, IOApp}
import fs2.Stream
object Main extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    val path = "/tmp/output"
    val numberWriting: Stream[IO, Unit] = ???
    numberWriting.compile.drain.as(ExitCode.Success)
  }
}

В этом примере мы определяем поток numberWriting. Сразу после его определения мы преобразуем поток в IO, который истощает его содержимое и, наконец, возвращает статус Success.

Здесь numberWriting предназначен для потока FS2, который описывает запись (потенциально очень большой) последовательности чисел в файл. И здесь возникает проблема, потому что у файла есть жизненный цикл, ресурс. Нам нужно убедиться, что в случае непредвиденных проблем мы корректно закроем файл и завершим программу. Иначе мы можем все испортить 😅.

Здесь на помощь приходит API FS2. Мы рассмотрим несколько вариантов, которые у нас есть.

Использование onFinalize

Прежде всего, потоки FS2 предлагают метод onFinalize, который мы можем использовать для установки ввода-вывода. Это гарантирует, что когда мы закончим с потоком (успешно или нет), эффект, приостановленный в IO, будет работать, несмотря ни на что. В нашем конкретном случае этим эффектом будет освобождение файла. Реализация, основанная на этом подходе, будет выглядеть так:

val file = new FileWriter(path)
val numberWriting = Stream(1, 2, 3, 4, 5, 6, 7, 8, 9)
  .evalMap(number => IO(file.write(s"$number\n")))
  .onFinalize(IO(file.close()))

Однако это не идеально, потому что мы слишком быстро открываем файл. Поток numberWriting может быть предназначен для запуска намного позже. Хотя это может подойти для других сценариев, давайте рассмотрим другие удобные способы улучшить это.

Использование кронштейна

Более общая альтернатива правильной работе — использование bracket. Этот метод предлагает нам способ определить, как мы получаем ресурс и как мы его освобождаем. Отсюда мы получаем поток, содержащий ресурс, в данном конкретном случае FileWriter. Если мы сопоставим этот поток с другими, как мы делаем здесь, мы гарантируем, что ресурс будет доступен, пока мы обрабатываем поток, а затем он будет освобожден. Кроме того, ресурс будет получен только тогда, когда он нам понадобится.

val numberWriting = for {
  file   <- Stream.bracket(IO(new FileWriter(path)))(
              file => IO(file.close())
            )
  number <- Stream(1, 2, 3, 4, 5, 6, 7, 8, 9)
  _      <- Stream.eval(IO(file.write(s"$number\n")))
} yield ()

Использование ресурса

Наконец, если мы используем что-то, что уже определено как ресурс Cats Effect, или мы работаем с чем-то, что можно легко определить как таковое, это правильный выбор.

val numberWriting = for {
  file   <- Stream.resource(
              Resource.fromAutoCloseable(IO(new FileWriter(path)))
            )
  number <- Stream(1, 2, 3, 4, 5, 6, 7, 8, 9)
  _      <- Stream.eval(IO(file.write(s"$number\n")))
} yield ()

И это почти все! Надеюсь, вы нашли это полезным.

Удачного кодирования! 🧑‍💻😄