Alpakka предоставляет отличный способ доступа к десяткам различных источников данных. Файловые источники, такие как HDFS и FTP, поставляются как Source[ByteString, Future[IOResult]
. Однако HTTP-запросы через Akka HTTP доставляются как потоки сущностей Source[ByteString, NotUsed]
. В моем случае я хотел бы получать контент из источников HTTP как Source[ByteString, Future[IOResult]
, чтобы я мог создать унифицированный сборщик ресурсов, который работает с несколькими схемами (в данном случае hdfs, файл, ftp и S3).
В частности, я хотел бы преобразовать источник Source[ByteString, NotUsed]
в Source[ByteString, Future[IOResult]
, где я могу вычислить IOResult из входящего потока байтов. Существует множество методов, таких как flatMapConcat
и viaMat
, но ни один из них, похоже, не может извлечь детали из входного потока (например, количество прочитанных байтов) или правильно инициализировать структуру IOResult
. В идеале я ищу метод со следующей сигнатурой, который будет обновлять IOResult по мере поступления потока.
def matCalc(src: Source[ByteString, Any]) = Source[ByteString, Future[IOResult]] = {
src.someMatFoldMagic[ByteString, IOResult](IOResult.createSuccessful(0))(m, b) => m.withCount(m.count + b.length))
}