Интеграция WSO2 Siddhi CEP и Kafka

В настоящее время я нахожусь в процессе интеграции Siddhi CEP WSO2 и Kafka. Я хочу создать поток Сиддхи, получая события от Кафки. Полученные данные Kafka имеют формат JSON, где каждое событие выглядит примерно так:

{  
   "event":{  
      "orderID":"1532538588320",
      "timestamps":[  
         15325,
         153
      ],
      "earliestTime":1532538
   }
}

SiddhiApp, который я пытаюсь запустить в потоковом процессоре WSO2, выглядит так:

@App:name('KafkaSiddhi')
@App:description('Consume events from a Kafka Topic and print the output.')

-- Streams
@source(type='kafka', 
topic.list = 'order-aggregates',
partition.no.list = '0',
threading.option = 'single.thread',
group.id = 'time-aggregates',
bootstrap.servers = 'localhost:9092, localhost:2181',
@map(type='json'))
define stream TimeAggregateStream (orderID string,timestamps 
object,earliestTime long);

@sink(type="log")
define stream TimeAggregateResultStream (orderID string, timestamps 
object, earliestTime long);

-- Queries
from TimeAggregateStream 
select orderID, timestamps, earliestTime
insert into TimeAggregateResultStream;

Запуск этого приложения должен регистрировать все данные, обновляемые в кластере Kafka с агрегатами заказов, который я слушаю. Но я не вижу никакого вывода, когда нажимаю Run.

Я могу сказать, что существует некоторый тип взаимодействия между потоковым процессором WSO2 и темой агрегирования заказов, потому что сообщения об ошибках выводятся в реальном времени всякий раз, когда я запускаю приложение с несовместимыми типами данных для моей схемы потока. Сообщения об ошибках выглядят так:

[2018-07-25_10-14-37_224] ERROR 
{org.wso2.extension.siddhi.map.json.sourcemapper.JsonSourceMapper} - 
Json message {"event":{"orderID":"210000000016183","timestamps": 
[1532538627000],"earliestTime":1532538627000}} contains incompatible 
attribute types and values. Value 210000000016183 is not compatible with 
type LONG. Hence dropping the message. (Encoded) 

Однако, если схема настроена правильно, при запуске приложения я не получаю никаких выходных данных. Я действительно не знаю, как это понять. Когда я пытаюсь отладить это, помещая точку останова в строку, включая «вставить в», отладчик никогда не останавливается на этой строке.

Может ли кто-нибудь дать представление о том, как подойти к этой проблеме?


person Dae Woon Koo    schedule 25.07.2018    source источник


Ответы (1)


Мы добавили поддержку объектов для расширения json mapper в последний выпуск расширения. Загрузите расширение [1] и замените jar-файл siddhi-map-json в / lib.

[1] https://store.wso2.com/store/assets/analyticsextension/details/0e6a6b38-f1d1-49f5-a685-e8c16741494d.

С уважением, Раминду.

person Ramindu De Silva    schedule 01.08.2018