Я получаю некоторые данные по частям и хочу обработать точки данных с минимальным временным расстоянием. (Вы можете сформулировать это как throttleTime
без потери данных или как противоположность buffer
). После тонны попыток и поисков в Google я пришел к следующему решению:
dataChunkReceiver
.pipe(
switchMap(chunk => of(...chunk)), // spread chunk of data-points to distinct events
zip(interval(minimumInterval)), // make sure events don't get executed more often than minimumInterval
map(itemAndNumber => itemAndNumber[0]) // only forward the data-point
)
zip(interval(minimumInterval))
-решение отлично работает, скажем, с событиями мыши, но не в этой комбинации с switchMap и чанками. Он по-прежнему работает как шарм, когда куски приходят чаще, чем minimumInterval
, но не в том случае, если они медленнее. Не могу понять почему, может кто меня просветит?
Чтобы увидеть проблему, попробуйте мой Stackblitz набор chunkInterval
(интервал, в котором прибывают новые порции данных) до 2000, а minimumInterval
- 400. Как вы можете видеть, все точки данных из каждого фрагмента обрабатываются одновременно. Когда вы устанавливаете chunkInterval
на 400, они выходят каждые 500 миллисекунд, как и ожидалось. Для 2000 года chunkInterval
я ожидал, что точки данных будут в миллисекундах.
2000 | 2500 | 3000 | 4000 | 4500 | 5000 | 6000 | 6500 | 7000
Я уверен, что это должно быть тонкое заблуждение в моем решении, пожалуйста, помогите найти его!
zip
не работает сchunkInterval: 2000
&minimumInterval: 400
, потому чтоinterval
испустил три раза к тому времени, когда будет отправлен первый кусок из 3 элементов. Таким образом,zip
может заархивировать 3 входящих элемента чанка с 3 уже имеющимися элементами интервала, не дожидаясь дополнительных элементов интервала. Вы можете использовать rxmarbles.com/#zip, чтобы понять, когда излучаетzip
. Кстати.switchMap(chunk => of(...chunk))
можно заменить наswitchMap(chunk => chunk)
, которое можно заменить наswitchAll()
- person frido   schedule 21.08.2020