Узнайте, как загрузить сериализованную модель Spark ML, хранящуюся в формате пакета MLeap, в файловой системе Databricks (DBFS) и использовать ее для классификации новых потоковых данных, проходящих через платформу StreamSets DataOps.
В своих предыдущих блогах я показал, как легко можно расширить возможности StreamSets Transformer с помощью Scala и PySpark. Если вы еще не просматривали блоги обучить модель Spark ML Random Forest Regressor, сериализовать обученную модель, обучить модель логистической регрессии NLP, я настоятельно рекомендую ознакомиться с ней, прежде чем продолжить, потому что этот блог основан на них.
Хорошо, давайте приступим к делу!
Потоковые данные: от Твиттера до Кафки
Я разработал этот конвейер StreamSets Data Collector для приема и преобразования твитов и их хранения в Kafka. Этот конвейер является основным источником наших потоковых данных, для которых мы будем выполнять анализ тональности во втором конвейере.
Обзор конвейера:
Вот пример оригинального ответа Twitter Search API, полученного источником HTTP-клиента.
А вот пример трансформированного твита, написанного Кафке.
Классификация потоковых данных: Kafka to Spark ML Model to Databricks
Как подробно описано в моих предыдущих блогах, давайте предположим, что вы обучили и сериализовали модель в формате пакета MLeap, и она хранится в DBFS, как показано ниже.
Далее… Я разработал этот конвейер StreamSets Transformer, работающий на кластере Databricks.
В основном он берет входные данные (кадр) и, если он содержит столбец «текст» (твит), загружает модель NLP («spark_nlp_model.zip») и классифицирует каждый твит. Затем он создает новый фрейм данных, в котором только твит и его классификация хранятся в столбце «прогноз». (Обратите внимание, что вы также можете передать/включить все столбцы, присутствующие во входном фрейме данных, а не только два — «текст» и «прогноз».)
Анализ блоков данных
После того как твиты вместе с их классификацией будут сохранены в файловой системе Databricks, они готовы к запросам в Databricks Notebook.
Здесь я создал фрейм данных, который считывает все файлы Parquet, выводимые вторым конвейером в расположении DBFS /dash/nlp/, и показывает, как выглядят данные.
Запросить твиты и их классификацию
Здесь я создал временную таблицу, которая считывает те же данные, хранящиеся в расположении /dash/nlp/ DBFS, и агрегированный запрос, показывающий общее количество положительных твитов по сравнению с негативные твиты.
Создать временную таблицу и агрегировать данные
В демонстрационном видео я также показал, как создавать и запускать структурированные потоковые запросы в Databricks для автоматического обновления подсчетов — общего количества позитивных и негативных твитов — без необходимости вручную обновлять исходный фрейм данных по мере поступления новых данных. поступает из второго трубопровода.
Хорошие новости!
Основываясь на моей модели и собранных данных, кажется, что больше положительных настроений, чем отрицательных настроений, когда дело касается # карантинная жизнь хэштег. Это что-то, чтобы чувствовать себя хорошо! 🙂
Однако, если честно и справедливо, само собой разумеется, что точность модели зависит от размера и качества обучающих и тестовых наборов данных, а также от проектирования функций и настройки гиперпараметров, что не совсем является целью этого блога; скорее, чтобы продемонстрировать, как можно использовать и расширять платформу StreamSets DataOps для различных вариантов использования.
Узнайте больше о StreamSets для Databricks и StreamSets DataOps Platform, которые доступны на Microsoft Azure Marketplace и AWS Marketplace.
Первоначально опубликовано на https://streamsets.com 9 апреля 2020 г.