Убедитесь, что существует минимальная временная задержка между событиями

Я получаю некоторые данные по частям и хочу обработать точки данных с минимальным временным расстоянием. (Вы можете сформулировать это как throttleTime без потери данных или как противоположность buffer). После тонны попыток и поисков в Google я пришел к следующему решению:

dataChunkReceiver
  .pipe(
    switchMap(chunk => of(...chunk)), // spread chunk of data-points to distinct events
    zip(interval(minimumInterval)), // make sure events don't get executed more often than minimumInterval
    map(itemAndNumber => itemAndNumber[0]) // only forward the data-point
  )

zip(interval(minimumInterval))-решение отлично работает, скажем, с событиями мыши, но не в этой комбинации с switchMap и чанками. Он по-прежнему работает как шарм, когда куски приходят чаще, чем minimumInterval, но не в том случае, если они медленнее. Не могу понять почему, может кто меня просветит?

Чтобы увидеть проблему, попробуйте мой Stackblitz набор chunkInterval (интервал, в котором прибывают новые порции данных) до 2000, а minimumInterval - 400. Как вы можете видеть, все точки данных из каждого фрагмента обрабатываются одновременно. Когда вы устанавливаете chunkInterval на 400, они выходят каждые 500 миллисекунд, как и ожидалось. Для 2000 года chunkInterval я ожидал, что точки данных будут в миллисекундах.

2000 | 2500 | 3000 | 4000 | 4500 | 5000 | 6000 | 6500 | 7000

Я уверен, что это должно быть тонкое заблуждение в моем решении, пожалуйста, помогите найти его!


person Paflow    schedule 21.08.2020    source источник
comment
Как ни странно, я не наткнулся на поток, содержащий решение в моем поиске, но отмеченный как дублированный поток действительно содержит `2. ConcatMap к решению таймера, которое помогло. Большое спасибо за то, что нашел его для меня. Я до сих пор не знаю, почему мои решения терпят неудачу, поскольку проблемный характер упоминается, но не объясняется в потоке, но в любом случае.   -  person Paflow    schedule 21.08.2020
comment
подход zip не работает с chunkInterval: 2000 & minimumInterval: 400, потому что interval испустил три раза к тому времени, когда будет отправлен первый кусок из 3 элементов. Таким образом, zip может заархивировать 3 входящих элемента чанка с 3 уже имеющимися элементами интервала, не дожидаясь дополнительных элементов интервала. Вы можете использовать rxmarbles.com/#zip, чтобы понять, когда излучает zip. Кстати. switchMap(chunk => of(...chunk)) можно заменить на switchMap(chunk => chunk), которое можно заменить на switchAll()   -  person frido    schedule 21.08.2020