Сопоставить или присоединиться к событию и правилу из двух тем kafka в одном потоке данных

Я хочу объединить две темы кафки в один поток данных.

На самом деле два потока данных должны иметь один и тот же идентификатор, чтобы выполнить соединение. Событие — это данные, поступающие от датчиков, а правило содержит правила, которые будут проверяться с помощью CEP (поступающие из пользовательского интерфейса).

Вот мой тест, но он не работает, кто-нибудь может мне помочь?

DataStream<Object> evtAndRule=inputEventStream.join(rulesStream)
            .where(new KeySelector<TrackEvent, Object>() {
                @Override
                public Object getKey(Event event) throws Exception {
                    return event.getId();
                }
            }).equalTo(new KeySelector<RulesEvent, Object>() {
                @Override
                public Object getKey(RulesEvent rulesEvent) throws Exception {
                    return rulesEvent.getId();
                }
            }).window(TumblingTimeWindows.of(Time.of(10, TimeUnit.SECONDS)))
            .apply(new FlatJoinFunction<TrackEvent, RulesEvent, Object>() {
                @Override
                public void join(TrackEvent trackEvent, RulesEvent rulesEvent, Collector<Object> collector) throws Exception {
            ....

                }
            });

person Tarek Khal    schedule 24.04.2017    source источник


Ответы (1)


Я пробовал это, но не знаю, как получить нужное правило и является ли это лучшим решением

        DataStream<Tuple2<Event , RulesEvent>> evtAndRule= inputEventStream.map(new MapFunction<Event , Tuple2<Event , RulesEvent>>() {
        @Override
        public Tuple2<Event , RulesEvent> map(final Event event) throws Exception {

            return new Tuple2<Event , RulesEvent>(event, new RulesEvent());
        }
    });
person Tarek Khal    schedule 24.04.2017