Akka Streams разделяет поток по типу

У меня есть следующая простая иерархия классов case:

sealed trait Message
case class Foo(bar: Int) extends Message
case class Baz(qux: String) extends Message

И у меня есть Flow[Message, Message, NotUsed] (из протокола на основе Websocket с уже установленным кодеком).

Я хочу демультиплексировать этот Flow[Message] в отдельные потоки для типов Foo и Baz, так как они обрабатываются совершенно разными путями.

Каков самый простой способ сделать это? Должно быть очевидно, но я что-то упускаю...


person Alexander Temerev    schedule 05.11.2016    source источник


Ответы (1)


Один из способов — создать RunnableGraph, который включает потоки для каждого типа сообщений.

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>

  val in = Source(...)  // Some message source
  val out = Sink.ignore

  val foo = builder.add(Flow[Message].map (x => x match { case f@Foo(_) => f }))
  val baz = builder.add(Flow[Message].map (x => x match { case b@Baz(_) => b }))
  val partition = builder.add(Partition[Message](2, {
    case Foo(_) => 0
    case Baz(_) => 1
  }))

  partition ~> foo ~> // other Flow[Foo] here ~> out
  partition ~> baz ~> // other Flow[Baz] here ~> out

  ClosedShape
}

g.run()
person Alan Effrig    schedule 05.11.2016
comment
Правильно, раздел. Хорошо, я мог бы сделать именно это. Вероятно, для этого было бы полезно иметь встроенный комбинатор; возможно, я сделаю запрос на вытягивание. - person Alexander Temerev; 05.11.2016
comment
@AlexanderTemerev Это может представлять интерес: doc.akka.io/api/akka/2.4/ - person Brian; 06.11.2016
comment
Ссылка мертва. Есть ли какой-нибудь комбинатор для этого? - person mirelon; 23.10.2020
comment
PartitionWith находится рядом с github.com/akka/akka-stream-contrib/blob/master/src/main/scala/ - person mirelon; 23.10.2020