Доступ к базовому объекту ActorRef потока akka Источник, созданный Source.actorRef

Я пытаюсь использовать Source.actorRef для создания akka.stream.scaladsl.Source объект. Что-то в форме

import akka.stream.OverflowStrategy.fail
import akka.stream.scaladsl.Source

case class Weather(zip : String, temp : Double, raining : Boolean)

val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)

val sunnySource = weatherSource.filter(!_.raining)
...

У меня вопрос: как мне отправить данные в мой исходный объект на основе ActorRef?

Я предположил, что отправка сообщений Источнику была чем-то вроде формы

//does not compile
weatherSource ! Weather("90210", 72.0, false)
weatherSource ! Weather("02139", 32.0, true)

Но weatherSource не имеет оператора ! или метода tell.

документация не слишком описание того, как использовать Source.actorRef, оно просто говорит, что вы можете...

Заранее благодарим Вас за отзыв и ответ.


person Ramón J Romero y Vigil    schedule 11.06.2015    source источник


Ответы (3)


Вам нужен Flow:

  import akka.stream.OverflowStrategy.fail
  import akka.stream.scaladsl.Source
  import akka.stream.scaladsl.{Sink, Flow}

  case class Weather(zip : String, temp : Double, raining : Boolean)

  val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)

  val sunnySource = weatherSource.filter(!_.raining)

  val ref = Flow[Weather]
    .to(Sink.ignore)
    .runWith(sunnySource)

  ref ! Weather("02139", 32.0, true)

Помните, что все это экспериментально и может измениться!

person Noah    schedule 11.06.2015
comment
В M5 похоже, что Source.actorRef не существует. Вы знаете, куда он переехал? - person Ramón J Romero y Vigil; 11.06.2015
comment
Похоже, они в основном изменили это, чтобы передать Props источнику. Обновленная документация находится здесь http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M5/scala/stream-integrations.html - person Noah; 11.06.2015
comment
1.0-RC3 — это самая последняя версия, и Source.actorRef по-прежнему находится в том же месте: http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-RC3/#akka.stream.scaladsl.Source$ - person jrudolph; 12.06.2015
comment
Source(Props) в M5, теперь Source.actorPublisher в RC3 — это что-то другое: он используется для создания источника, поддерживаемого пользовательской реализацией ActorPublisher. - person jrudolph; 12.06.2015
comment
хм, у меня похожая проблема при попытке получить базовый ActorRef, но мне нужна ссылка, прежде чем я смогу создать свой Sink. Можно ли выбросить этот Flow и создать другой Flow? - person fommil; 12.07.2015
comment
Я создаю свой Sink с Sink.create(subscriber). Это возвращает Sink, который материализуется как Unit. Есть ли способ сделать это и при этом получить ActorRef? - person Troy Daniels; 11.09.2015
comment
Не обращайте внимания на мой предыдущий комментарий. Я делал Source.from(...).runWith(Sink.create(sub, ec), который теряет исходную материализацию. Если вы используете Flow.runWith, вы получите оба. - person Troy Daniels; 11.09.2015

Поскольку @Noah указывает на экспериментальный характер потоков akka, его ответ может не работать с выпуском 1.0. Мне пришлось следовать примеру, приведенному в этом примере:

implicit val materializer = ActorMaterializer()
val (actorRef: ActorRef, publisher: Publisher[TweetInfo]) = Source.actorRef[TweetInfo](1000, OverflowStrategy.fail).toMat(Sink.publisher)(Keep.both).run()
actorRef ! TweetInfo(...)
val source: Source[TweetInfo, Unit] = Source[TweetInfo](publisher)
person Thien    schedule 13.09.2015

Экземпляр ActorRef, как и все "материализованные значения", станет доступным только после материализации всего потока или, другими словами, при запуске RunnableGraph.

// RunnableGraph[ActorRef] means that you get ActorRef when you run the graph
val rg1: RunnableGraph[ActorRef] = sunnySource.to(Sink.foreach(println))

// You get ActorRef instance as a materialized value
val actorRef1: ActorRef = rg1.run()

// Or even more correct way: to materialize both ActorRef and future to completion 
// of the stream, so that we know when we are done:

// RunnableGraph[(ActorRef, Future[Done])] means that you get tuple
// (ActorRef, Future[Done]) when you run the graph
val rg2: RunnableGraph[(ActorRef, Future[Done])] =
  sunnySource.toMat(Sink.foreach(println))(Keep.both)

// You get both ActorRef and Future[Done] instances as materialized values
val (actorRef2, future) = rg2.run()

actorRef2 ! Weather("90210", 72.0, false)
actorRef2 ! Weather("02139", 32.0, true)
actorRef2 ! akka.actor.Status.Success("Done!") // Complete the stream
future onComplete { /* ... */ }
person Dmytro Mantula    schedule 29.05.2016
comment
Довести до конца и ActorRef, и future — потрясающе! Спасибо! - person AlonL; 23.09.2017