Узнайте, как загрузить сериализованную модель Spark ML, хранящуюся в формате пакета MLeap, в файловой системе Databricks (DBFS) и использовать ее для классификации новых потоковых данных, проходящих через платформу StreamSets DataOps.

В своих предыдущих блогах я показал, как легко можно расширить возможности StreamSets Transformer с помощью Scala и PySpark. Если вы еще не просматривали блоги обучить модель Spark ML Random Forest Regressor, сериализовать обученную модель, обучить модель логистической регрессии NLP, я настоятельно рекомендую ознакомиться с ней, прежде чем продолжить, потому что этот блог основан на них.

Хорошо, давайте приступим к делу!

Потоковые данные: от Твиттера до Кафки

Я разработал этот конвейер StreamSets Data Collector для приема и преобразования твитов и их хранения в Kafka. Этот конвейер является основным источником наших потоковых данных, для которых мы будем выполнять анализ тональности во втором конвейере.

Обзор конвейера:

Вот пример оригинального ответа Twitter Search API, полученного источником HTTP-клиента.

А вот пример трансформированного твита, написанного Кафке.

Классификация потоковых данных: Kafka to Spark ML Model to Databricks

Как подробно описано в моих предыдущих блогах, давайте предположим, что вы обучили и сериализовали модель в формате пакета MLeap, и она хранится в DBFS, как показано ниже.

Далее… Я разработал этот конвейер StreamSets Transformer, работающий на кластере Databricks.

В основном он берет входные данные (кадр) и, если он содержит столбец «текст» (твит), загружает модель NLP («spark_nlp_model.zip») и классифицирует каждый твит. Затем он создает новый фрейм данных, в котором только твит и его классификация хранятся в столбце «прогноз». (Обратите внимание, что вы также можете передать/включить все столбцы, присутствующие во входном фрейме данных, а не только два — «текст» и «прогноз».)

Анализ блоков данных

После того как твиты вместе с их классификацией будут сохранены в файловой системе Databricks, они готовы к запросам в Databricks Notebook.

Здесь я создал фрейм данных, который считывает все файлы Parquet, выводимые вторым конвейером в расположении DBFS /dash/nlp/, и показывает, как выглядят данные.

Запросить твиты и их классификацию

Здесь я создал временную таблицу, которая считывает те же данные, хранящиеся в расположении /dash/nlp/ DBFS, и агрегированный запрос, показывающий общее количество положительных твитов по сравнению с негативные твиты.

Создать временную таблицу и агрегировать данные

В демонстрационном видео я также показал, как создавать и запускать структурированные потоковые запросы в Databricks для автоматического обновления подсчетов — общего количества позитивных и негативных твитов — без необходимости вручную обновлять исходный фрейм данных по мере поступления новых данных. поступает из второго трубопровода.

Хорошие новости!

Основываясь на моей модели и собранных данных, кажется, что больше положительных настроений, чем отрицательных настроений, когда дело касается # карантинная жизнь хэштег. Это что-то, чтобы чувствовать себя хорошо! 🙂

Однако, если честно и справедливо, само собой разумеется, что точность модели зависит от размера и качества обучающих и тестовых наборов данных, а также от проектирования функций и настройки гиперпараметров, что не совсем является целью этого блога; скорее, чтобы продемонстрировать, как можно использовать и расширять платформу StreamSets DataOps для различных вариантов использования.

Узнайте больше о StreamSets для Databricks и StreamSets DataOps Platform, которые доступны на Microsoft Azure Marketplace и AWS Marketplace.

Первоначально опубликовано на https://streamsets.com 9 апреля 2020 г.