Как реорганизовать этот код с помощью потоков akka.

Идея состоит в том, чтобы оставить канал открытым, чтобы использовать его позже. В документации playframework 2.5.x говорится, что вы должны использовать потоки akka, но ничего не говорится о том, как добиться этого примера. Кто-нибудь может мне помочь?

import play.api.mvc._
import play.api.libs.iteratee._
import play.api.libs.concurrent.Execution.Implicits.defaultContext

def socket =  WebSocket.using[String] { request =>

  // Concurrent.broadcast returns (Enumerator, Concurrent.Channel)
  val (out, channel) = Concurrent.broadcast[String]

  // log the message to stdout and send response back to client
  val in = Iteratee.foreach[String] {
    msg => println(msg)
      // the Enumerator returned by Concurrent.broadcast subscribes to the channel and will
      // receive the pushed messages
      channel push("I received your message: " + msg)
  }
  (in,out)
}

person Carlos Hernandez Perez    schedule 14.03.2016    source источник
comment
В чем собственно вопрос?   -  person Anton    schedule 15.03.2016
comment
Возможный дубликат Play framework Как использовать потоки akka вывод в вебсокет   -  person Anton    schedule 15.03.2016
comment
@ Антон, вопрос в том, как реорганизовать код с помощью потоков akka, идея в том, чтобы иметь возможность держать канал открытым и использовать его позже в ответ на другое событие.   -  person Carlos Hernandez Perez    schedule 17.03.2016
comment
Я понимаю, но тогда было бы очень полезно, если бы вы поместили это объяснение в сам вопрос - прямо сейчас единственное предложение в посте - это не настоящий вопрос, а утверждение.   -  person Anton    schedule 17.03.2016


Ответы (3)


Вам придется сделать что-то вроде этого!

val (subscriber, publisher)=Source.asSubscriber[String]
      .toMat(Sink.asPublisher[String](fanout = true))(Keep.both).run()

def websocketAction=WebSocket.accept { requestHeader =>
    Flow.fromSinkAndSource(Sink.fromSubscriber(subscriber),Source.fromPublisher(publisher))
}

Первая часть создаст, учитывая приемник и поток, объекты, которые вам понадобятся для отправки сообщений и их получения (подпишитесь на издателя).

наконец, вы создадите поток для каждого запроса веб-сокета, который вы получаете с этим кодом Flow.fromSinkAndSource... Что-то, что неясно в отношении потоков Akka (Sources, Sinks и Flows), заключается в том, что они представляют форму потока, но не поток как таковой ... поток идет, когда вы их материализуете (методом runWith или run). Теперь... Play получает либо Sources (при использовании событий, отправленных сервером), либо Flows при использовании WebSockets. И они все еще не материализованы... так что вам нужно их материализовать (первую строку), а затем снова создать Поток! (строка websocketAction)

Извините, если я недостаточно ясен, однако используйте этот код, он будет работать.

person Alejandro Navas    schedule 19.07.2016
comment
новый play 2.5 websocket api явно провалился - person Hans Westerbeek; 17.09.2016
comment
Добавляет синтаксические накладные расходы, но как только я их понял, я думаю, что на самом деле это довольно мощно. - person Alejandro Navas; 28.11.2016

Наконец-то я нашел решение с помощью Актеров. Я нашел это:

def conect = WebSocket.accept[JsValue, JsValue] {request => 
  ActorFlow.actorRef(out => UserWebSocket.props(out, users))
}

Затем я посмотрел исходный код ActorFlow.actorRef: https://github.com/playframework/playframework/blob/2.5.0/framework/src/play-streams/src/main/scala/play./api/libs/streams/ActorFlow.scala

и придумал это решение:

import javax.inject._
import play.api.Configuration
import play.api.mvc._
import scala.concurrent._

import akka.stream.{Materializer, OverflowStrategy}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.actor._

class UserActor(out: ActorRef) extends Actor {
  def receive = {
    // receives messages from client browser here
    // out is actor that will send messages back to client(s)
    case msg: String => out ! "Received message "+msg
  }
}
object UserActor {
  def props(out: ActorRef) = Props(new UserActor(out))
}

@Singleton
class NotificationController @Inject()(val config:Configuration)
                          (implicit ec: ExecutionContext, actorSystem:ActorSystem, materializer: Materializer) extends Controller {

  // outActor can be used to send messages to client(s)
  // Sink.asPublisher(true) makes this a broadcast channel (multiple clients can connect to this channel, and messages sent to outActor are broadcast to all of them).  Use Sink.asPublisher(false) to create a unicast channel.
  val (outActor, publisher) = Source.actorRef[String](99, OverflowStrategy.dropNew)
        .toMat(Sink.asPublisher(true))(Keep.both).run()


  def flowsocket = WebSocket.accept[String, String] {request =>
    val aflow:Flow[String, String, _] = {

        val sink = Sink.actorRef( actorSystem.actorOf(UserActor.props(outActor)), akka.actor.Status.Success(()) )

        val source = Source.fromPublisher(publisher)

        Flow.fromSinkAndSource(
            sink, source
        )
    }
    aflow
  }

}

С тех пор я пересмотрел свое решение, чтобы более полно использовать модель Актера. Теперь у меня есть «UsersBroadcastActor», который является одноэлементным актером, к которому подключаются все остальные «UserActor» и могут общаться через него:

lazy val broadcastActorRef = actorSystem.actorOf(Props[UsersBroadcastActor])

def flowsocket = WebSocket.accept[JsValue, JsValue] { request =>
    ActorFlow.actorRef(out => UserActor.props(out, broadcastActorRef))
}

Когда создается экземпляр UserActor, в своем методе preStart() он отправляет сообщение о подписке в BroadActorRef, которое сохраняет ссылки на все UserActor, которые «подписываются» на него. Я могу отправить сообщение на BroadActorRef, и оно пересылает его каждому из UserActors. Дайте мне знать, если вам нужен полный пример кода этого решения.

person Leonya    schedule 24.03.2016

Я думаю, вы просто ищете, как установить соединение через веб-сокет Echo с Play 2.5 и потоком Akka Streams.

Это должно помочь

  def socket = WebSocket.accept[String, String] { request =>
    Flow[String]
      .map(msg => "I received your message: " + msg)
  }
person Jonas Anso    schedule 15.03.2016
comment
На самом деле я знаю, как сделать веб-сокет Echo, но это не то, о чем я прошу. В моем примере вы можете видеть, что вы можете использовать канал для отправки сообщений, и вы можете сохранить канал и использовать его позже. Предположим, вы получаете событие, и вы должны уведомить пользователя по каналу - person Carlos Hernandez Perez; 17.03.2016