Использование mapFuture в потоках akka

Я играю с Akka Streams и пытаюсь немного обогатить и обработать события, полученные из коллекции MongoDB. Однако у меня есть некоторые сомнения относительно наилучшего подхода к реализации моих средств обогащения событий, которым может потребоваться подключение к внешнему источнику данных.

MapFuture кажется подходящим, но у меня есть некоторые проблемы:

class EventEnricherActor extends Actor with ActorLogging {

  // ...

  def receive = {
    case e: Event =>
      sender ! augmentEvent(e)
  }

}

И мое приложение:

val enricherActor = actorSystem.actorOf(Props[EventEnricherActor])

Flow(mongodbConsumer).
      mapFuture(msg => enricherActor ? msg).
      onComplete(materializer) { _ => actorSystem.shutdown()}

Однако я застрял в этой ошибке:

java.lang.ClassCastException: messages.Event cannot be cast to scala.runtime.Nothing$

при вызове mapFuture.

Что мне не хватает?

Любые лучшие идеи, чтобы справиться с этим обогащением?

Обновление Трассировка стека:

[ERROR] [08/12/2014 11:48:07.314] [actor-system-akka.actor.default-dispatcher-2] [akka://actor-system/user/flow-1-2-transform] failure during processing
java.lang.ClassCastException: messages.Event cannot be cast to scala.runtime.Nothing$
    at apps.Consume$$anonfun$1$$anonfun$apply$1.apply(Consume.scala:47)
    at akka.stream.impl.MapFutureProcessorImpl$$anonfun$1.apply$mcV$sp(MapFutureProcessorImpl.scala:125)
    at akka.stream.impl.Pump$$anonfun$pump$1.apply$mcV$sp(Transfer.scala:163)
    at akka.stream.impl.Pump$$anonfun$pump$1.apply(Transfer.scala:163)
    at akka.stream.impl.Pump$$anonfun$pump$1.apply(Transfer.scala:163)
    at akka.stream.impl.ActorBasedFlowMaterializer$.withCtx(ActorBasedFlowMaterializer.scala:133)
    at akka.stream.impl.Pump$class.pump(Transfer.scala:163)
    at akka.stream.impl.ActorProcessorImpl.pump(ActorProcessor.scala:238)
    at akka.stream.impl.BatchingInputBuffer.enqueueInputElement(ActorProcessor.scala:93)
    at akka.stream.impl.BatchingInputBuffer$$anonfun$upstreamRunning$1.applyOrElse(ActorProcessor.scala:140)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at akka.stream.impl.SubReceive.apply(Transfer.scala:18)
    at akka.stream.impl.SubReceive.apply(Transfer.scala:14)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at akka.stream.impl.SubReceive.applyOrElse(Transfer.scala:14)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:165)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:166)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at akka.stream.impl.ActorProcessorImpl.aroundReceive(ActorProcessor.scala:238)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Спасибо


person FabioC    schedule 11.08.2014    source источник
comment
У вас есть трассировка стека?   -  person Patrik Nordwall    schedule 12.08.2014
comment
@PatrikNordwall обновлен с помощью трассировки стека   -  person FabioC    schedule 12.08.2014


Ответы (1)


Моя ошибка, mongodbConsumer (ActorProducer) был нетипизирован!

Итак, это:

val mongodbConsumer = ActorProducer[Event](system.actorOf(MongodbConsumerActor.props(db)))

Вместо:

val mongodbConsumer = ActorProducer(system.actorOf(MongodbConsumerActor.props(db)))
person FabioC    schedule 13.08.2014