У меня есть два потока, streamA
и streamB
. Каждый из streamA
имеет идентификатор, и соответствующее событие в streamB
будет иметь такой же идентификатор.
Я хочу узнать от streamA
, какие идентификаторы не пришли streamB
после скользящего окна, скажем, 1 минуты.
Я пробовал это, но не получилось:
from streamA as A join streamB#window.time(1 min) as B on A.id == B.id select S.Id insert expired events into streamC;
Дайте мне знать, как решить эту проблему.