Полоса загрузки с потоковой передачей Akka

Возможно ли иметь какую-то полосу загрузки с потоковой передачей Akka? Я ищу что-то, что может дать состояние продвижения источника.

Source via loadingBar(expectedElment) via someThingElse to Sink

где expectedElement — количество элементов, которые должны пройти при достижении «100%».


person crak    schedule 16.02.2016    source источник


Ответы (1)


Если вы хотите отслеживать ход выполнения вне потока, я рекомендую использовать Agent. Агент будет хранить текущий счетчик:

import akka.agent.Agent
import akka.stream.scaladsl.Flow

def runningCountFlow[T](agent : Agent[Float]) = Flow[T].map { val =>
  agent send (_ + 1.0f)
  val
} 

Затем вы можете отслеживать агент вне потока:

import scala.concurrent.ExecutionContext.Implicits.global

val expectedCount = 42.toFloat
val countAgent = Agent(0.0f)

type DataType = ???

val stream = 
  Source via runningCountFlow[DataType](countAgent) via someThingElse runWith Sink

Thread.sleep(5000) //let the stream run for a while

val percentComplete = countAgent.get / expectedCount

println(s"stream is $percentComplete complete")
person Ramón J Romero y Vigil    schedule 16.02.2016
comment
Агент решает параллельную проблему (я надеюсь). Но, может быть, что-то вроде этого было бы лучше ``` def runningCountFlow[T](agent : Agent[Float],show:Interval) = Flow[T].map { val =› agent send (_ + 1.0f) if(agent.get % interval) { println(sstream is $percentComplete Complete) } val } ``` - person crak; 17.02.2016
comment
Агенты устарели в Akka 2.6. doc.akka.io/docs/akka/current/project/ - person Meni; 25.09.2020