Alpakka S3 загружает файл из корзины, сохраняет в файл и имеет имя файла, доступное для следующей части в потоке.

Я пытаюсь создать код, который использует ключи S3, затем загружает эти файлы с S3, затем сохраняет эти данные в файл на диске с именем ключа (необходимым для дальнейшего процесса) и в качестве вывода возвращает ключ/имя файла. Что у меня есть до сих пор;

    val x: Sink[String, Future[IOResult]] =
      Flow[String].flatMapConcat(key => S3.download("somebucket", key)).
        withAttributes(S3Attributes.settings(useVersion1Api)).
        collect{ case Some(x) => x._1 }.
        flatMapConcat(identity).toMat(FileIO.toPath(Paths.get("???????")))(Keep.right)

То, что у меня сейчас есть, загружает файл, но не загружает; - использовать имя ключа в качестве имени файла - возвращает имя файла (это должен быть не сток, а поток)

Буду признателен за любые указатели. Я только начал с потоков alpakka и akka. Вероятно, мне нужно как-то передать ключ в кортеже, но я не могу понять, как позже использовать эту часть кортежа.

Использование чантепа, первое предложение, вероятно, остановилось;

    val s3FileSaveFlow: Flow[String, (String, ObjectMetadata), NotUsed] =
      Flow[String].flatMapConcat(key => S3.download("somebucket", key) collect{ case Some(src) => key -> src}).
        flatMapConcat{ case (key,(src,meta)) => {
          src.to(FileIO.toPath(Paths.get(key)))
          Source.single((key,meta))
        }}

person Aktaeon    schedule 21.11.2019    source источник


Ответы (1)


val x: Sink[String, Future[IOResult]] =
  Flow[String].flatMapConcat(key => 
    S3.download("somebucket", key).collect {
      case Some(src) => key -> SRC
    })

Тогда у вас есть key и байт src.

Если вы хотите взглянуть на Benji S3 DSL (я участник из):

import akka.stream.scaladsl.FileIO

Flow[String].flatMapConcat { objKey =>
  s3.bucket("somebucket").obj(objKey).get().
    viaMat(FileIO.toPath(Path.get("/basedir", objKey)))(Keep.right)
}
person cchantep    schedule 21.11.2019
comment
Спасибо за вашу помощь и предложения, я отредактировал свой ответ, чтобы отразить изменение кода на основе вашего первого предложения. Поскольку у вас есть опыт, будет ли правильно использовать Source.single в flatMapConcat? - person Aktaeon; 22.11.2019
comment
{ src.to(FileIO.toPath(Paths.get(key))) Source.single((key,meta)) } имеет побочный эффект, не рекомендуется - person cchantep; 22.11.2019
comment
Каким будет правильный способ возврата (ключ, мета), чтобы все это было потоком? Проверка github.com/akka/akka/pull/25242/commits/ мне кажется, что есть оптимизация для возврата source.single внутри flatmapconcat. Твои мысли? - person Aktaeon; 22.11.2019