Как создать Async[Future] из Async[IO]

Я пытаюсь неявно добавить Async и Sync в свой код для репозитория doobie. Sync и Async[F] отлично работают с вводом-выводом. Я хочу преобразовать их в будущее и столкнуться с проблемой

Я попытался создать свой собственный Aync из IO

def futureAsync(implicit F: MonadError[Future, Throwable]): Async[Future] = new Async[Future] {
    override def async[A](k: (Either[Throwable, A] => Unit) => Unit): Future[A] = IO.async(k).unsafeToFuture()

    override def asyncF[A](k: (Either[Throwable, A] => Unit) => Future[Unit]): Future[A] =
      throw new Exception("Not implemented Future.asyncF")

    override def suspend[A](thunk: => Future[A]): Future[A] = thunk

    override def bracketCase[A, B](acquire: Future[A])(use: A => Future[B])(release: (A, ExitCase[Throwable]) => Future[Unit]): Future[B] =
      throw new Exception("Not implemented Future.bracketCase")

    override def raiseError[A](e: Throwable): Future[A] = F.raiseError(e)

    override def handleErrorWith[A](fa: Future[A])(f: Throwable => Future[A]): Future[A] = F.handleErrorWith(fa)(_ => f(new Exception("")))

    override def pure[A](x: A): Future[A] = F.pure(x)

    override def flatMap[A, B](fa: Future[A])(f: A => Future[B]): Future[B] = F.flatMap(fa)(f)

    override def tailRecM[A, B](a: A)(f: A => Future[Either[A, B]]): Future[B] = F.tailRecM(a)(f)
  }

Я поражен реализацией двух функций там asyncF и скобкиCase Может ли кто-нибудь помочь?


person user11034858    schedule 03.05.2019    source источник
comment
Я думаю, что это просто невозможно, потому что Sync означает синхронную приостановку. И Future не может приостанавливаться, он выполняется немедленно. Таким образом, вы можете написать эти экземпляры, они будут проверять тип, но полученный Async будет иметь неправильную семантику. typelevel.org/cats-effect/typeclasses   -  person Reactormonk    schedule 03.05.2019


Ответы (1)


Как Reactormonk говорит в комментарии выше, невозможно написать экземпляр Async для Future с правильной семантикой, потому что Async расширяет Sync, а Sync требует представления вычислений, которые можно запускать неоднократно, в то время как фьючерсы Scala начинают выполняться, когда они определены и не могут быть повторно запущены.

Незаконный случай

Тем не менее, полезно убедиться в этом самостоятельно, и я бы посоветовал вам попытаться написать свой собственный компилируемый, но (обязательно) недопустимый экземпляр Async[Future], не глядя на следующий блок кода. Тем не менее, для примера, вот быстрый набросок, который пришел мне в голову:

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}
import cats.effect.{Async, ExitCase, IO}

def futureAsync(implicit c: ExecutionContext): Async[Future] = new Async[Future] {
  def async[A](k: (Either[Throwable, A] => Unit) => Unit): Future[A] =
    IO.async(k).unsafeToFuture()

  def asyncF[A](k: (Either[Throwable, A] => Unit) => Future[Unit]): Future[A] = {
    val p = Promise[A]()
    val f = k {
      case Right(a) => p.success(a)
      case Left(e) => p.failure(e)
    }
    f.flatMap(_ => p.future)
  }

  def suspend[A](thunk: => Future[A]): Future[A] = Future(thunk).flatten

  def bracketCase[A, B](acquire: Future[A])(use: A => Future[B])(
    release: (A, ExitCase[Throwable]) => Future[Unit]
  ): Future[B] = acquire.flatMap { a =>
    use(a).transformWith {
      case Success(b) => release(a, ExitCase.Completed).map(_ => b)
      case Failure(e) => release(a, ExitCase.Error(e)).flatMap(_ => Future.failed(e))
    }
  }

  def raiseError[A](e: Throwable): Future[A] = Future.failed(e)
  def handleErrorWith[A](fa: Future[A])(f: Throwable => Future[A]): Future[A] =
    fa.recoverWith { case t => f(t) }

  def pure[A](x: A): Future[A] = Future.successful(x)
  def flatMap[A, B](fa: Future[A])(f: A => Future[B]): Future[B] = fa.flatMap(f)
  def tailRecM[A, B](a: A)(f: A => Future[Either[A, B]]): Future[B] = f(a).flatMap {
    case Right(b) => Future.successful(b)
    case Left(a) => tailRecM(a)(f)
  }
}

Это прекрасно скомпилируется и, вероятно, будет работать в некоторых ситуациях (но, пожалуйста, не используйте его на самом деле!). Однако мы сказали, что у него не может быть правильной семантики, и мы можем показать это, используя модуль законов кошачьего эффекта.

Проверка законов

Во-первых, нам нужны шаблонные вещи, о которых вам не нужно беспокоиться:

import cats.kernel.Eq, cats.implicits._
import org.scalacheck.Arbitrary

implicit val throwableEq: Eq[Throwable] =  Eq.by[Throwable, String](_.toString)
implicit val nonFatalArbitrary: Arbitrary[Throwable] =
  Arbitrary(Arbitrary.arbitrary[Exception].map(identity))

implicit def futureEq[A](implicit A: Eq[A], ec: ExecutionContext): Eq[Future[A]] =
  new Eq[Future[A]] {
    private def liftToEither(f: Future[A]): Future[Either[Throwable, A]] =
      f.map(Right(_)).recover { case e => Left(e) }

      def eqv(fx: Future[A], fy: Future[A]): Boolean =
        scala.concurrent.Await.result(
        liftToEither(fx).zip(liftToEither(fy)).map {
          case (rx, ry) => rx === ry
        },
        scala.concurrent.duration.Duration(1, "second")
      )
  }

Затем мы можем определить тест, который проверяет законы Async для нашего экземпляра:

import cats.effect.laws.discipline.{AsyncTests, Parameters}
import org.scalatest.FunSuite
import org.typelevel.discipline.scalatest.Discipline

object FutureAsyncSuite extends FunSuite with Discipline {
  implicit val ec: ExecutionContext = ExecutionContext.global

  implicit val params: Parameters =
    Parameters.default.copy(allowNonTerminationLaws = false)

  checkAll(
    "Async",
    AsyncTests[Future](futureAsync).async[String, String, String]
  )
}

И тогда мы можем запустить юридические тесты:

scala> FutureAsyncSuite.execute()
FutureAsyncSuite:
- Async.async.acquire and release of bracket are uncancelable
- Async.async.ap consistent with product + map
- Async.async.applicative homomorphism
...

Вы увидите, что большинство тестов зеленого цвета; этот экземпляр многое делает правильно.

Где это нарушает закон

Тем не менее, он показывает три неудачных теста, включая следующие:

- Async.async.repeated sync evaluation not memoized *** FAILED ***
  GeneratorDrivenPropertyCheckFailedException was thrown during property evaluation.
   (Discipline.scala:14)
    Falsified after 1 successful property evaluations.
    Location: (Discipline.scala:14)
    Occurred when passed generated values (
      arg0 = "淳칇멀",
      arg1 = org.scalacheck.GenArities$$Lambda$7154/1834868832@1624ea25
    )
    Label of failing property:
      Expected: Future(Success(驅ṇ숆㽝珅뢈矉))
  Received: Future(Success(淳칇멀))

Если вы посмотрите на файл определения законов, вы увидите, что это тест, который определяет значение Future с delay, а затем упорядочивает его несколько раз, например:

val change = F.delay { /* observable side effect here */ }
val read = F.delay(cur)

change *> change *> read

Два других сбоя аналогичны «незапоминаемым» нарушениям. Эти тесты должны увидеть побочный эффект дважды, но в нашем случае невозможно написать delay или suspend вместо Future таким образом, чтобы это произошло (однако стоит попробовать, чтобы убедиться, что это так).

Что вы должны делать вместо этого

Подводя итог: вы можете написать экземпляр Async[Future], который пройдет что-то около 75 из 78 тестов по Async законам, но невозможно написать экземпляр, который пройдет их все, а использование незаконного экземпляра — очень плохая идея: и то, и другое потенциальные пользователи вашего кода и библиотек, таких как Doobie, будут считать ваши экземпляры законными, и если вы не соответствуете этому предположению, вы открываете дверь для сложных и раздражающих ошибок.

Стоит отметить, что не так сложно написать минимальную оболочку для Future, которая имеет законный экземпляр Async (например, у меня есть оболочка для будущего Twitter под названием Rerunnable в моем catbird). Тем не менее, вам действительно следует просто придерживаться cats.effect.IO и использовать предоставленные преобразования для преобразования в и из фьючерсов в любых частях вашего кода, где вы работаете с традиционными API на основе Future.

person Travis Brown    schedule 03.05.2019