Подписчик темы Lagom - как повторить попытку в Future Exception?

У меня есть подписчик темы в lagom, как показано ниже

fooService.fooTopic().subscribe
  .atLeastOnce(
    Flow[fooMsg].map {
      case fooMsg(_) =>
        foo()
      case a =>
        println(a)
    }.async.map{ _ =>
      Done
    }
  )

чтобы подписаться на эту тему, я использую atLeastOnce в качестве метода, поэтому, если есть какое-либо исключение, я хочу, чтобы поток был перезапущен/повторен. когда я выбрасываю обычное исключение, он может продолжать повторять попытки в обычном режиме.

  private def foo() = {
    throw new RuntimeException("testing error")
  }

но когда в будущем произойдет исключение, как бы я ни пытался, Flow не перезапустится. вот одна из моих попыток обработать исключение в будущем

  private def foo() = {
    val test: Future[Int] = Future(throw new RuntimeException("asd"))
    val result = for {
      y1 <- test
    } yield (y1)

    result.onComplete{
      case Success(value) => println("SUCCESS")
      case Failure(exception) => println(exception.getMessage)
                                 throw exception
    }
  }
  private def foo() = {
    val test: Future[Int] = Future(throw new RuntimeException("asd"))

    test.onComplete{
      case Success(value) => println("SUCCESS")
      case Failure(exception) => println(exception.getMessage)
                                 throw exception
    }
  }

он покажет исключение, но Flow не перезапустит его автоматически. Как мне обработать/сгенерировать исключение в Future?


person Mark    schedule 09.07.2020    source источник


Ответы (1)


Я думаю, не нужно перезапускать полный поток, если вы завалили только одно будущее. Я предлагаю повторить попытку только Future. Например, вы можете написать такой код, который повторит ваш вызов, заменив Future.successful(10) при вызове вашего метода:

        val test: Future[Int] = Future(throw new RuntimeException("asd")).recoverWith {
          case NonFatal(e) =>
            Future.successful(10)
        }

        val result = for {
          y1 <- test
        } yield (y1)

Кроме того, вы можете написать код так, как хотите, он потерпит неудачу и повторит попытку, но вам нужно вернуть результат вашего Future:

  kafka.topic1.subscribe.atLeastOnce(Flow[String]
    .mapAsync(1) {
      case envelope: String =>

        val test: Future[String] = Future(throw new RuntimeException("asd"))
      /*.recoverWith {
          case NonFatal(e) =>
            Future.successful("10")
        }*/

        val result = for {
          y1 <- test
        } yield (y1)

        println(s"code block $envelope")
       result.onComplete{
          case Success(value) => println(s"Message from topic: $envelope $result")
          case Failure(exception) => println(exception.getMessage)
            throw exception
        }
      result.map(_ => Done)
    }
)
person Vladislav Kievski    schedule 09.07.2020
comment
в моем случае мне нужно повторить весь поток, если есть какие-либо исключения. собственно, будущий результат был получен через http-вызов. так что мне нужно повторить весь мой поток и повторно отправить http-вызов. мой результат дал async.map - person Mark; 09.07.2020
comment
Вы можете попробовать второй вариант, он будет постоянно повторять попытки до тех пор, пока не будет достигнут успех. - person Vladislav Kievski; 09.07.2020
comment
map().asyncasync.map(), если на то пошло) и mapAsync() - это очень разные вещи. Оператор async в потоке просто означает, что при материализации в потоке существует асинхронная граница, поэтому сообщения могут обрабатываться одновременно с обеих сторон (конвейерная обработка). mapAsync (и другие операторы, которые представляют собой некоторые синхронные операторы с добавлением Async) обозначают, что логика этапа является асинхронной (включая будущее). - person Levi Ramsey; 09.07.2020