Сбои подсчета конвейера ReactiveSwift после завершения

У меня есть конвейер в ReactiveSwift для загрузки. Я хочу убедиться, что даже в случае сбоя одной из загрузок остальные не будут прерваны.

После того, как все они завершились успешно или неудачно, я должен либо вернуть успех из метода performUploads, либо, если какой-либо из них не удался, я должен вернуть ошибку, поэтому следующий шаг, часть загрузки, не начнется. Даже если есть ошибки, у всех загрузок должна быть возможность загрузиться, их нельзя останавливать.

Есть ли способ выяснить, есть ли какие-либо ошибки после завершения загрузки? Смотрите методы здесь:

let pendingUploadItemsArray: [Items] = ...
func performUploads() -> SignalProducer<(), MyError> {
    return upload(pendingUploadItemsArray)
        .then(doAnything())
}

private func upload(_ items: [Items]) -> SignalProducer<Signal<(), MyError>.Event, Never> {
    let producers = items
        .filter { item in
            return item.readyForUpload
        }
        .map { self.upload($0).materialize() }
    
    return SignalProducer.merge(producers)
}

private func upload(_ item: Item) -> SignalProducer<(), MyError> {
    return internalUploader.upload(item)
        .on(failed: failedToUpload(item),
            value: successfullyUploaded(item))
        .ignoreValues()
}

где метод загрузки internalUploader:

func upload(_ item: Item) -> SignalProducer<Any, MyError>

А затем в другом классе вы бы назвали этот загрузчик:

let sync = self.uploader.performUploads()
        .then(startDownloads())

startDownloads следует запускать только в том случае, если все загрузки завершились успешно. Спасибо за любое понимание.

Это может быть что-то, что должно быть сделано совершенно по-другому.


person Zsolt    schedule 16.11.2020    source источник


Ответы (1)


Я не знаю точно, что successfullyUploaded и failedToUpload делают в вашем коде, но, по-видимому, вы отслеживаете успехи и неудачи, чтобы обеспечить какой-то пользовательский интерфейс в реальном времени. Вот как я бы это структурировал:

struct UploadResult {
    let item: Item
    let error: Error? // nil if the upload succeeded

    var succeeded: Bool { error == nil }
    var failed: Bool { !succeeded }
}

...

static func upload(_ items: [Item]) -> SignalProducer<[UploadResult], Never> {
    SignalProducer(items)
        .filter(\.readyForUpload)
        .flatMap(.merge) { item in
            Self.internalUploader(item)
                .map { UploadResult(item: item, error: nil) }
                .flatMapError { error in
                    SignalProducer(value: UploadResult(item: item, error: error))
                }
        }
        .scan(into: [UploadResult]()) { ( results: inout [UploadResult], nextResult) in
            results.append(nextResult)
        }
}
  1. Я создаю структуру UploadResult, представляющую элемент, который удалось или не удалось загрузить.
  2. В функции upload вместо того, чтобы создавать массив производителей и затем объединять их, я преобразовываю массив элементов в производителя сигналов элементов с помощью SignalProducer(items), а затем использую flatMap(.merge) для объединения загрузок в одного производителя сигналов.
  3. Вместо использования materialize я использую map для преобразования успешных загрузок в UploadResult и использую flatMapError для преобразования неудачных загрузок в UploadResult.
  4. Я использую scan для накопления результатов по завершении каждой загрузки. Каждый раз, когда загрузка завершается (успешно или с ошибкой), scan будет отправлять обновленный массив результатов загрузки, которые можно использовать для обновления пользовательского интерфейса.

Тогда вы можете использовать его следующим образом:

Uploader.upload(someItems)
    .on(value: { resultsSoFar in
        // Update UI here
    })
    .take(last: 1)
    .attempt { results in
        if !results.allSatisfy(\.succeeded) {
            // At least one of the uploads failed, so send an error
            throw MyError()
        }
    }
    .then(startDownloads)
  1. Я использую оператор on(value:) для обновления пользовательского интерфейса на основе текущих результатов. Каждый раз, когда загрузка завершается успешно или неудачно, это закрытие будет вызываться с обновленными результатами.
  2. Я использую take(last: 1) для фильтрации всех промежуточных результатов; он отправит окончательные результаты только после завершения всех загрузок.
  3. Я использую attempt, чтобы проверить, не произошла ли какая-либо загрузка, и если это так, выдает ошибку. Это гарантирует, что загрузка будет запущена только в том случае, если все загрузки прошли успешно.

Надеюсь, это подходит для вашего варианта использования, но дайте мне знать в комментарии, если я что-то пропустил в более широком контексте!

РЕДАКТИРОВАТЬ

Если вам нужно работать с результатами только по одному, а не как с текущим массивом, вы можете избавиться от scan, а затем заменить take(last: 1) на collect:

static func upload(_ items: [Item]) -> SignalProducer<UploadResult, Never> {
    SignalProducer(items)
        .filter(\.readyForUpload)
        .flatMap(.merge) { item in
            Self.internalUploader(item)
                .map { UploadResult(item: item, error: nil) }
                .flatMapError { error in
                    SignalProducer(value: UploadResult(item: item, error: error))
                }
        }
}

...

Uploader.upload(someItems)
    .on(value: { latestResult in
        // Do something with the latest result
    })
    .collect()
    .attempt { results in
        if !results.allSatisfy(\.succeeded) {
            // At least one of the uploads failed, so send an error
            throw MyError()
        }
    }
    .then(startDownloads)
person jjoelson    schedule 16.11.2020