Наконец-то я нашел решение с помощью Актеров. Я нашел это:
def conect = WebSocket.accept[JsValue, JsValue] {request =>
ActorFlow.actorRef(out => UserWebSocket.props(out, users))
}
Затем я посмотрел исходный код ActorFlow.actorRef: https://github.com/playframework/playframework/blob/2.5.0/framework/src/play-streams/src/main/scala/play./api/libs/streams/ActorFlow.scala
и придумал это решение:
import javax.inject._
import play.api.Configuration
import play.api.mvc._
import scala.concurrent._
import akka.stream.{Materializer, OverflowStrategy}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.actor._
class UserActor(out: ActorRef) extends Actor {
def receive = {
// receives messages from client browser here
// out is actor that will send messages back to client(s)
case msg: String => out ! "Received message "+msg
}
}
object UserActor {
def props(out: ActorRef) = Props(new UserActor(out))
}
@Singleton
class NotificationController @Inject()(val config:Configuration)
(implicit ec: ExecutionContext, actorSystem:ActorSystem, materializer: Materializer) extends Controller {
// outActor can be used to send messages to client(s)
// Sink.asPublisher(true) makes this a broadcast channel (multiple clients can connect to this channel, and messages sent to outActor are broadcast to all of them). Use Sink.asPublisher(false) to create a unicast channel.
val (outActor, publisher) = Source.actorRef[String](99, OverflowStrategy.dropNew)
.toMat(Sink.asPublisher(true))(Keep.both).run()
def flowsocket = WebSocket.accept[String, String] {request =>
val aflow:Flow[String, String, _] = {
val sink = Sink.actorRef( actorSystem.actorOf(UserActor.props(outActor)), akka.actor.Status.Success(()) )
val source = Source.fromPublisher(publisher)
Flow.fromSinkAndSource(
sink, source
)
}
aflow
}
}
С тех пор я пересмотрел свое решение, чтобы более полно использовать модель Актера. Теперь у меня есть «UsersBroadcastActor», который является одноэлементным актером, к которому подключаются все остальные «UserActor» и могут общаться через него:
lazy val broadcastActorRef = actorSystem.actorOf(Props[UsersBroadcastActor])
def flowsocket = WebSocket.accept[JsValue, JsValue] { request =>
ActorFlow.actorRef(out => UserActor.props(out, broadcastActorRef))
}
Когда создается экземпляр UserActor, в своем методе preStart() он отправляет сообщение о подписке в BroadActorRef, которое сохраняет ссылки на все UserActor, которые «подписываются» на него. Я могу отправить сообщение на BroadActorRef, и оно пересылает его каждому из UserActors. Дайте мне знать, если вам нужен полный пример кода этого решения.
person
Leonya
schedule
24.03.2016