Использование преобразования потока KSQL и пользовательских функций для развертывания моделей машинного обучения в реальном времени

В Vrbo мы используем Apache Kafka в качестве основы для нашей потоковой архитектуры. Нам также нравится развертывать модели машинного обучения, чтобы делать прогнозы в реальном времени для наших потоков данных. Confluent KSQL предоставляет простой в использовании интерактивный интерфейс SQL для выполнения потоковой обработки в Kafka. Ниже мы покажем, как построить модель на Python и использовать модель в KSQL для прогнозирования на основе потока данных в Kafka. Мы используем Predictive Model Markup Language (PMML), чтобы дать возможность обучать модель с помощью библиотеки Python Scikit-learn, но выполнять вывод модели в KSQL на основе Java. Ниже мы дадим пошаговое руководство, показывающее шаги, необходимые для локального запуска простого примера. KSQL и его определяемые пользователем функции (UDF) являются ключевыми частями этой реализации, которые интересны даже вне контекста машинного обучения.

Настройка KSQL

Чтобы запустить KSQL, вы можете либо скачать Confluent Platform, либо использовать Docker. В дальнейшем я буду использовать версию Docker. Создайте следующий docker-compose.yml файл для быстрого начала работы с Docker:

Этот файл предполагает, что у вас есть подкаталог ./volume в текущем каталоге. После docker-compose up -d ваши серверы Kafka и KSQL будут запущены. Команда docker-compose ps проверит это.

Обучение модели

Мы сделаем то, что часто является приветственным миром моделей машинного обучения: мы обучим модель на наборе данных радужной оболочки глаза. В этом наборе данных у нас есть 150 образцов данных по трем видам ириса: Iris setosa, Iris virginica и Iris versicolor. Каждый образец имеет четыре характеристики: длину чашелистика, ширину чашелистика, длину лепестка и ширину лепестка. Мы обучим модель логистической регрессии, чтобы классифицировать, к какому виду ириса относится образец, на основе четырех характеристик. Как упоминалось выше, мы используем PMML для обучения модели на Python, но запускаем ее на Java / KSQL. В частности, мы используем проект JPMML-Sklearn, чтобы дать нам именно то, что нам нужно в данном случае. Блокнот Jupyter для обучения модели можно найти здесь, но мы также включим приведенный ниже код.

Помимо некоторых обычных библиотек, которые, как ожидается, будут под рукой специалиста по данным (например, pandas и sklearn), нам нужно pip install пакет sklearn2pmml перед запуском следующего.

Реализация модели в KSQL

Оценка модели будет производиться в KSQL путем реализации модели внутри KSQL UDF. Я создал проект Maven для создания необходимых jar-файлов. Это основной UDF:

Вы заметите, что этот код использует файл iris-pipeline.pmml. Чтобы получить это, нам нужно преобразовать файл pickle конвейера iris-pipeline.pkl.z, созданный в предыдущем разделе. Для конвертации вам необходимо клонировать и собрать проект JPMML-Sklearn (или просто скачать jar здесь). Вы также можете полностью пропустить следующий шаг, загрузив уже созданный PMML-файл. С банкой в ​​руке команда для запуска:

java -jar files/jars/jpmml-sklearn-executable-1.5-SNAPSHOT.jar --pkl-input ./models/iris-pipeline.pkl.z --pmml-output ./models/iris-pipeline.pmml

Клонируйте, а затем создайте этот проект, используя mvn clean package. Вы можете проверить, что файл PMML работает правильно, используя this:

java -cp target/ksql-ml-pmml-example-0.0.1-jar-with-dependencies.jar ksqlexample.pmml.LocalRun models/iris-pipeline.pmml 7.0,3.2,4.7,4.4

Переместите файл PMML и jar в каталог, который вы смонтировали в контейнере ksql-server.

cp models/iris-pipeline.pmml docker/volume/
cp target/ksql-ml-pmml-example-0.0.1-jar-with-dependencies.jar docker/volume/

Перезапустите сервер KSQL следующим образом.

docker-compose up -d ksql-server

Демонстрация оценки модели в реальном времени

KSQL должен быть запущен и иметь доступ к модели и UDF. Теперь мы готовы отправить данные в тему Kafka и посмотреть, как модель машинного обучения делает прогнозы через KSQL.

Во-первых, мы должны создать тему. Используя Docker, команда выглядит следующим образом:

docker-compose exec broker kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic iris_data

В другом терминале запустите KSQL CLI.

docker-compose exec ksql-cli ksql http://ksql-server:8088

поток - это эквивалент KSQL таблицы в SQL. Вы можете создать его из интерфейса командной строки KSQL, введя следующую команду.

CREATE STREAM iris_stream (sepal_length double,sepal_width double,petal_length double,petal_width double) WITH (kafka_topic='iris_data', value_format='json');

Наконец, мы реализуем нашу модель для входящих данных, вызывая UDF в KSQL.

select ksqlexample.pmml.IrisLogReg(sepal_length,sepal_width, petal_length,petal_width) from iris_stream;

Вот и все! Теперь мы можем начать публиковать данные в теме и наблюдать, как модель делает прогнозы. Запустите производитель консоли, а затем вставьте данные примера в.

docker-compose exec broker kafka-console-producer  --broker-list broker:29092 --topic iris_data
>{"sepal_length": 5.1,"sepal_width": 3.5,"petal_length": 1.4,"petal_width": 0.2}
>{"sepal_length": 5.9,"sepal_width": 3.0,"petal_length": 5.1,"petal_width": 1.8}
>{"sepal_length": 6.3,"sepal_width": 2.5,"petal_length": 4.9,"petal_width": 1.5}

Если вы снова переключитесь на терминал, на котором запущен KSQL CLI, вы должны увидеть, как модель делает прогнозы, для какого класса (0, 1 или 2) классификатор предсказывает, что это будет образец. Ниже приведено видео для иллюстрации.

Вышеупомянутое можно было бы легко объединить с различными запросами KSQL для выполнения манипуляций с данными перед выполнением оценки модели. Например, если бы мы хотели выполнить вывод модели только для образцов, у которых длина чашелистика больше 3,0, мы могли бы создать поток следующим образом:

CREATE STREAM iris_filtered AS SELECT * FROM iris_stream WHERE sepal_width > 3.0;

Или если данные в теме Kafka были опубликованы в сантиметрах, а модель ожидает, что они будут в дюймах:

CREATE STREAM iris_to_inches AS SELECT sepal_length/2.54, sepal_width/2.54, petal_length/2.54, petal_width/2.54 FROM iris_stream;

Заключение

KSQL позволяет выполнять SQL-подобные запросы к потоку данных, чтобы более просто преобразовать поток и напрямую передать его в модель машинного обучения. В дополнение к существующему кластеру Kafka требуются дополнительные накладные расходы, поскольку вам необходимо иметь запущенные серверы KSQL, но взамен вы можете отправить простой SQL-запрос, на основе которого KSQL построит и развернет задание KStreams, которое может включать машинное обучение. как в примере выше.

Здесь мы использовали PMML для формата сериализации модели. Мы также экспериментировали с использованием MLeap вместо PMML. Хотя мы обнаружили, что UDF KSQL также могут быть написаны на Scala, была определенная ошибка отражения Scala, которая оказалась проблематичной во время десериализации модели MLeap в UDF. H2O - еще один естественный фреймворк машинного обучения, который можно здесь использовать. Он создает модели, которые естественно работают на Java и, следовательно, хорошо интегрируются с KSQL, как показано в этом блоге Confluent. Блог Confluent был очень полезен при создании пошагового руководства, представленного выше, и наш вклад можно рассматривать как дополнительный, поскольку пример требует меньше с точки зрения настройки и предлагает демонстрацию кросс-языковой функциональности (с Python на Java).