Сочетание маршрутизации спрея и сопоставления шаблонов актеров

Следуя документации Akka Cluster, у меня запущен пример Worker Dial-in.

http://doc.akka.io/docs/akka/snapshot/java/cluster-usage.html

Поэтому я пытаюсь интегрировать это с маршрутизацией распыления.

Моя идея состоит в том, чтобы иметь кластер за кулисами и через http rest вызывать эту службу.

Итак, у меня есть следующий код.

object Boot extends App {

  val port = if (args.isEmpty) "0" else args(0)
  val config =
    ConfigFactory
      .parseString(s"akka.remote.netty.tcp.port=$port")
      .withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]"))
      .withFallback(ConfigFactory.load())

  val system = ActorSystem("ClusterSystem", config)
  val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend")
  implicit val actSystem = ActorSystem()

  IO(Http) ! Http.Bind(frontend, interface = config.getString("http.interface"), port = config.getInt("http.port"))
}

class TransformationFrontend extends Actor {

  var backends = IndexedSeq.empty[ActorRef]
  var jobCounter = 0
  implicit val timeout = Timeout(5 seconds)

  override def receive: Receive = {

    case _: Http.Connected => sender ! Http.Register(self)

    case HttpRequest(GET, Uri.Path("/job"), _, _, _) =>

      jobCounter += 1
      val backend = backends(jobCounter % backends.size)

      val originalSender = sender()

      val future : Future[TransformationResult] = (backend ? new TransformationJob(jobCounter + "-job")).mapTo[TransformationResult]
      future onComplete {
        case Success(s) =>
          println("received from backend: " + s.text)
          originalSender ! s.text
        case Failure(f) => println("error found: " + f.getMessage)
      }

    case job: TransformationJob if backends.isEmpty =>
      sender() ! JobFailed("Service unavailable, try again later", job)

    case job: TransformationJob =>
      jobCounter += 1
      backends(jobCounter % backends.size) forward job

    case BackendRegistration if !backends.contains(sender()) =>
      println("backend registered")
      context watch sender()
      backends = backends :+ sender()

    case Terminated(a) =>
      backends = backends.filterNot(_ == a)
  }
}

Но что я действительно хочу сделать, так это объединить маршрутизацию распыления с этим сопоставлением шаблонов.

Вместо того, чтобы писать свой GET, как указано выше, я хотел бы написать так:

path("job") {
  get {
    respondWithMediaType(`application/json`) {
      complete {
        (backend ? new TransformationJob(jobCounter + "-job")).mapTo[TransformationResult]
      }
    }
  }
}

Но расширяя моего Актера этим классом, я должен сделать следующее

def receive = runRoute(defaultRoute)

Как я могу объединить этот подход с моими методами сопоставления шаблонов TransformationFrontend Actor? BackendRegistration, Terminated, TransformationJob?


person Thiago Pereira    schedule 17.06.2015    source источник
comment
Как насчет перенаправления ваших запросов Spray другому актеру, у которого есть логика для сопоставления с образцом. Если вы можете передать requestContext в сообщении, чтобы оттуда выполнить HTTP-запрос.   -  person Soumya Simanta    schedule 18.06.2015


Ответы (1)


Вы можете составить PartialFunction как Receive с PartialFunction.orElse:

class TransformationFrontend extends Actor {
  // ...
  def myReceive: Receive = {
    case job: TransformationJob => // ...
    // ...
  }
  def defaultRoute: Route =
    get {
      // ...
    }
  override def receive: Receive = runRoute(defaultRoute) orElse myReceive
}

Тем не менее, часто имеет смысл разделить функциональность на несколько акторов (как предложено в комментарии выше), если это возможно.

person jrudolph    schedule 18.06.2015
comment
Отлично, я был так сосредоточен, пытаясь смешать оба вместе, что никогда не думал о разделении методов приема. Спасибо. - person Thiago Pereira; 18.06.2015