Zio run блокирует обратно совместимый код

(Надеюсь) простой вопрос о Скалазе Зио.

У меня есть старый код, который я переделал на Zio. Я хочу, чтобы один путь этого кода продолжал вести себя точно так же, как и был:

  • синхронный
  • блокировка
  • в текущем потоке (это жесткое требование)

Как я могу запустить IO, чтобы он вел себя как старый код блокировки?

В настоящее время я использую:

  private lazy val blockingRts = new RTS {}
  def runBlocking[E, A](io: IO[E, A]): Either[E, A] = {
    blockingRts.unsafeRun(io.attempt)
  }

Кажется, это помогает, но я далеко не уверен, что это правильно. Это на 100% обратно совместимо со старым кодом?


person Lodewijk Bogaards    schedule 27.03.2019    source источник
comment
Особенно требование, чтобы он работал в текущем потоке, мне кажется очень сомнительным. Это похоже на фрагмент кода, который - с функциональной точки зрения - абсолютно не принадлежит IO. Но это всего лишь мое чутье.   -  person Markus Appel    schedule 01.04.2019
comment
Помимо этого - может быть, вы ищете Sync?   -  person Markus Appel    schedule 01.04.2019


Ответы (1)


Хорошо, я наконец заглянул под капот и реализовал кое-что, что, похоже, соответствует моим требованиям:

  /**
    * Executes the IO synchronous and blocking on the current thread, thus running an IO
    * without any of the advantages of IO. This can be useful for maintaining backwards compatibility.
    * Rethrows any exception that was not handled by the IO's error handling.
    */
  @throws
  def runLegacy[E, A](io: IO[E, A]): Either[E, A] = {
    syncBlockingRunTimeSystem.unsafeRunSync[Nothing, Either[E, A]](io.either) match {
      case Exit.Success(v) => v
      case Exit.Failure(Cause.Die(exception)) => throw exception
      case Exit.Failure(Cause.Interrupt) => throw new InterruptedException
      case Exit.Failure(fail) => throw FiberFailure(fail)
    }
  }

  private lazy val syncBlockingRunTimeSystem = Runtime(
    (),
    PlatformLive.fromExecutor(new Executor {
      override def yieldOpCount: Int = Int.MaxValue
      override def metrics: Option[ExecutionMetrics] = None
      override def submit(runnable: Runnable): Boolean = {
        runnable.run()
        true
      }
      override def here: Boolean = true
    })
  )

Еще я написал пару тестов:

  "runLegacy" should {
    "run synchronous code in blocking fashion on current thread" in {
      var runCount = 0
      val io = IO.succeedLazy { runCount += 1 }
        .map { _ => runCount +=1 }
        .flatMap { _ =>
          runCount += 1
          IO.effect {
            runCount += 1
            Thread.currentThread()
          }
        }

      runCount shouldBe 0
      runLegacy(io) shouldBe Right(Thread.currentThread())
      runCount shouldBe 4
    }

    "run parallel code sequentially on current thread" in {
      val ios = (1 to 500).map { i => IO.succeedLazy { i } }
      runLegacy(IO.reduceAll(IO.succeed(0), ios) {
        case (a, b) => a + b
      }) shouldBe Right((500 * 501) / 2)
    }

    "run many flatMaps without overflowing" in {
      var runCount = 0
      val io = IO.succeedLazy { runCount += 1 }
      val manyIo = (1 to 9999).foldLeft(io) { case (acc, _) => acc.flatMap { _ => io } }
      runLegacy(manyIo)
      runCount shouldBe 10000
    }

    case object TestException extends Throwable

    "handle sync blocking errors" in {
      case object TestException extends Throwable
      runLegacy(IO.effect(throw TestException)) shouldBe Left(TestException)
    }

    "rethrow unhandled exceptions" in {
      assertThrows[TestException.type] {
        runLegacy(IO.succeedLazy(throw TestException))
      }
    }
  }
person Lodewijk Bogaards    schedule 01.04.2019