Я пытаюсь создать код, который использует ключи S3, затем загружает эти файлы с S3, затем сохраняет эти данные в файл на диске с именем ключа (необходимым для дальнейшего процесса) и в качестве вывода возвращает ключ/имя файла. Что у меня есть до сих пор;
val x: Sink[String, Future[IOResult]] =
Flow[String].flatMapConcat(key => S3.download("somebucket", key)).
withAttributes(S3Attributes.settings(useVersion1Api)).
collect{ case Some(x) => x._1 }.
flatMapConcat(identity).toMat(FileIO.toPath(Paths.get("???????")))(Keep.right)
То, что у меня сейчас есть, загружает файл, но не загружает; - использовать имя ключа в качестве имени файла - возвращает имя файла (это должен быть не сток, а поток)
Буду признателен за любые указатели. Я только начал с потоков alpakka и akka. Вероятно, мне нужно как-то передать ключ в кортеже, но я не могу понять, как позже использовать эту часть кортежа.
Использование чантепа, первое предложение, вероятно, остановилось;
val s3FileSaveFlow: Flow[String, (String, ObjectMetadata), NotUsed] =
Flow[String].flatMapConcat(key => S3.download("somebucket", key) collect{ case Some(src) => key -> src}).
flatMapConcat{ case (key,(src,meta)) => {
src.to(FileIO.toPath(Paths.get(key)))
Source.single((key,meta))
}}