Изменить материализованное значение в источнике, используя содержимое потока

Alpakka предоставляет отличный способ доступа к десяткам различных источников данных. Файловые источники, такие как HDFS и FTP, поставляются как Source[ByteString, Future[IOResult]. Однако HTTP-запросы через Akka HTTP доставляются как потоки сущностей Source[ByteString, NotUsed]. В моем случае я хотел бы получать контент из источников HTTP как Source[ByteString, Future[IOResult], чтобы я мог создать унифицированный сборщик ресурсов, который работает с несколькими схемами (в данном случае hdfs, файл, ftp и S3).

В частности, я хотел бы преобразовать источник Source[ByteString, NotUsed] в Source[ByteString, Future[IOResult], где я могу вычислить IOResult из входящего потока байтов. Существует множество методов, таких как flatMapConcat и viaMat, но ни один из них, похоже, не может извлечь детали из входного потока (например, количество прочитанных байтов) или правильно инициализировать структуру IOResult. В идеале я ищу метод со следующей сигнатурой, который будет обновлять IOResult по мере поступления потока.

  def matCalc(src: Source[ByteString, Any]) = Source[ByteString, Future[IOResult]] = {
    src.someMatFoldMagic[ByteString, IOResult](IOResult.createSuccessful(0))(m, b) => m.withCount(m.count + b.length))
  }

person David Weber    schedule 21.01.2019    source источник


Ответы (2)


я не могу вспомнить какой-либо существующий функционал, который может это сделать из коробки, но вы можете использовать alsoToMat (на удивление не нашел его в документации akka streams, хотя вы можете посмотреть его в документация по исходному коду и Java API) функция потока вместе с Sink .fold, чтобы накопить некоторое значение и отдать его в самом конце. например:

def magic(source: Source[Int, Any]): Source[Int, Future[Int]] =
    source.alsoToMat(Sink.fold(0)((acc, _) => acc + 1))((_, f) => f)

дело в том, что alsoToMat объединяет входное значение мата с тем, что указано в alsoToMat. в то же время значения, созданные источником, не зависят от приемника в alsoToMat:

def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[Out, Mat3] =
  viaMat(alsoToGraph(that))(matF)

не так сложно адаптировать эту функцию для возврата IOResult, что соответствует исходному коду:

final case class IOResult(count: Long, status: Try[Done]) { ... }

еще одна последняя вещь, на которую вам нужно обратить внимание - вы хотите, чтобы ваш источник выглядел так:

Source[ByteString, Future[IOResult]]

но если вы не хотите нести это матовое значение до самого конца определения потока, а затем делать что-то на основе этого будущего завершения, это может быть подход, подверженный ошибкам. например, в этом примере я заканчиваю работу на основе этого будущего, поэтому последнее значение не будет обработано:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}

object App extends App {

  private implicit val sys: ActorSystem = ActorSystem()
  private implicit val mat: ActorMaterializer = ActorMaterializer()
  private implicit val ec: ExecutionContext = sys.dispatcher

  val source: Source[Int, Any] = Source((1 to 5).toList)

  def magic(source: Source[Int, Any]): Source[Int, Future[Int]] =
    source.alsoToMat(Sink.fold(0)((acc, _) => acc + 1))((_, f) => f)

  val f = magic(source).throttle(1, 1.second).toMat(Sink.foreach(println))(Keep.left).run()
  f.onComplete(t => println(s"f1 completed - $t"))
  Await.ready(f, 5.minutes)


  mat.shutdown()
  sys.terminate()
}
person Serhii Shynkarenko    schedule 22.01.2019
comment
Именно то, что я был после! - person David Weber; 23.01.2019

Это можно сделать, используя Promise для распространения материализованного значения.

val completion = Promise[IoResult]
val httpWithIoResult = http.mapMaterializedValue(_ => completion.future)

Теперь осталось выполнить обещание completion, когда соответствующие данные станут доступны.

Альтернативным подходом может быть переход к GraphStage API, где вы получаете более низкий уровень управления распространением материализованных значений. Но даже там использование Promises часто является выбранной реализацией для распространения материализованных значений. Взгляните на встроенные реализации операторов, такие как Ignore.

person dvim    schedule 21.01.2019