Масштабный запуск пользовательских функций кода Scala в облаке данных Snowflake

Обзор

В прошлом году Snowflake анонсировала множество функций, таких как поддержка неструктурированных данных, Snowtire, Snowpark, Snowsight и т. Д. В этом случае одна из ожидаемых функций SnowPark теперь доступна для предварительного просмотра. До недавнего времени у Snowflake не было встроенной интеграции с приложениями машинного обучения, а также поддержки наличия инфраструктуры качества данных на этапе извлечения / приема. CI / CD для конвейеров данных, происхождение на уровне атрибутов было довольно сложным и должно было зависеть от услуг других поставщиков. Функция SnowPark Scala API открыла двери для безграничных возможностей.

Сноупарк

Ниже приведены некоторые из ключевых функций, поддерживаемых SnowPark на сегодняшний день.

  • Выполнение Snowflake SQL с помощью программирования.
  • Ленивый анализ на сервере уменьшает объем данных, передаваемых между вашим клиентом и базой данных Snowflake.

  • Пользовательские функции, созданные на стороне клиента, можно отправить на сервер и запустить на нем.
  • Пользовательские модели машинного обучения можно обучать на других языках, таких как Python, и эти модели можно экспортировать в виде jar-файла и вызывать.

Код Scala, использованный в этой статье, хранится в Github. Хотя SnowPark находится на начальной стадии разработки, давайте посмотрим на некоторые из его текущих функций.

Создать сеанс

Создайте сеанс и выберите стол.

val session = Session.builder.configs(configMap).create
val df = session.sql( query = "select * from region")
df.show()

Присоединиться

Создайте две таблицы в Snowflake, соедините их и сохраните результат в другой таблице.

val reg = session.sql( query = "select * from region")
val nat = session.sql( query = "select * from nation")
val joined_df = reg.join(nat,reg("R_REGIONKEY") ===  nat("N_REGIONKEY"),"inner")
joined_df.show()
joined_df.write.mode(Overwrite).saveAsTable("joined_df")

Присоединенный результат

Ниже приведен SQL-код снежинки, созданный сервером для указанной выше операции.

Финальная таблица создана в снежинке.

Читать JSON API

Веб-приложения могут изначально записывать данные в снежинки, и это изменит способ построения конвейеров данных. Город Нью-Йорк поддерживает открытый API, и давайте читать его как JSON.

Прочтите JSON и создайте фрейм данных

def getJsonData =  {
  val url = "https://data.cityofnewyork.us/resource/2npr-yv2b.json"
  scala.io.Source.fromURL(url).mkString
}

val df = session.createDataFrame(Seq(getJsonData))
df.show()

После того, как JSON прочитан и вся очистка / обработка завершена, фрейм данных можно объединить / вставить в таблицу или переместить во внутренний этап снежинки. Ниже показан этап, созданный для хранения файла json.

После загрузки перечислите файлы через API сеанса.

session.sql(s"ls @$dataStageName").show()

Помещение файла

Jar-файлы зависимостей и экспортированные пользовательские модели jar-файлов могут быть размещены во внутреннем хранилище и могут использоваться для операций UDF.

Вывод

API Snowpark Scala открывает путь к безграничным возможностям, например, позволяя организациям внедрять свои собственные решения, созданные для конкретной цели, в конвейеры данных снежинки без особых переделок или усилий по разработке. Кроме того, на платформе данных Snowflake можно увидеть внедрение специализированных фреймворков качества данных на основе java / scala, выводы машинного обучения (позволяющие сэкономить почти 80% затрат) и совместное использование предсказаний машинного обучения в режиме реального времени.