Как установить текущий индекс или двигаться назад, хотя наблюдаемый rxjs?

Я использую наблюдаемую rxjs для моделирования последовательности шагов, которые происходят в моем приложении.

Псевдокод

const steps = [{id: "step1"}, {id: "step2"}, {id: "step3"}]

//... then at some point later do things based on the steps
from(steps).pipe(
    concatMap((step) => {
        ... do stuff with each step, perhaps including needing to go back to previous step
    })
)

Учитывая этот шаблон, как я могу смоделировать «возврат», если это необходимо по некоторым условиям?

Например, переход от шага 1 -> шаг 2 -> шаг 3 -> шаг 2 -> шаг 3.

Из того, что я могу сказать, rxjs кажется довольно строго односторонним. Я не нашел способа установить положение/индекс наблюдаемого (если это правильный способ его осмысления).

Извините, если я упустил что-то очевидное, но похоже, что это может быть довольно распространенным вариантом использования наблюдаемых, созданных из массивов.


person imagio    schedule 03.06.2020    source источник
comment
Учитывая, что Observables — это потоки событий, которые происходят в определенные моменты времени, может быть сложно вернуться назад ;-) Думаю, нам нужно двигаться быстрее скорости света.   -  person ggradnig    schedule 04.06.2020
comment
Возможно, вы можете сохранить шаги в каком-то списке.   -  person Roberto Zvjerković    schedule 04.06.2020
comment
По сути, вы не можете знать, пришел ли поток из массива какого-либо другого источника. Вы всегда можете использовать оператор repeat для повторной подписки на источник. Если будет холодно, как в вашем случае, вы снова получите все уведомления. Если жарко, вам нужно будет использовать что-то вроде shareReplay, чтобы воспроизвести прошлые события.   -  person ggradnig    schedule 04.06.2020


Ответы (1)


Я нашел способ сделать это, но я не уверен, что это "правильный" шаблон.

Я создал субъект и заставил его излучать значения в себя, чтобы контролировать, какой элемент из моей последовательности будет следующим. Выглядит так:

const steps = [{id: "step1"}, {id: "step2"}, {id: "step3"}, {id: "step4"}]

const runMySequence = async () => {

    let currentStep = steps[0]
    const sub = new Subject<typeof steps[number]>()

    sub.pipe(
         tap((step) => currentStep = step),
         concatMap(....process the step, do all sorts of stuff),
         tap((step) => {
             //Here's where we decide which step to execute next
             const idx = steps.indexOf(currentStep)
             if (idx !== steps.length - 1) {
                //Based on whatever condition we could decide to "go back" to a previous step here if we wanted
                sub.next(steps[idx + 1])
             } else {
                 //All out of steps, complete the subject
                 sub.complete()
             }
          })).subscribe()

    //kick off the sequence
    sub.next(steps[0])

    //and wait for it to finish
    await sub.toPromise()

}

Это кажется довольно чистым способом добиться этого. Любые мысли о том, почему у этого шаблона могут быть проблемы или лучшие способы сделать это?

person imagio    schedule 04.06.2020