Связать механизм SQL с конвейером машинного обучения может быть не очень интуитивно. Но на самом деле в Spark библиотека ML поддерживается SparkSQL. Более старый компонент MLlib основан на работе с RDD. Поскольку сообщество Spark продолжает улучшать среду выполнения SparkSQL и реализацию катализатора, предпринимаются попытки перенести его на операции на основе DataFrame / Dataset, которые полагаются на SparkSQL.

Из-за возросшей сложности SparkSQL разработчики машинного обучения могут быть не в состоянии понять детали. Иногда это затрудняет создание эффективного конвейера машинного обучения, если вы следуете только пути машинного обучения в Spark.

Одним из таких примеров является конфигурация, которую я добавил в SparkSQL несколько месяцев назад: spark.sql.constraintPropagation.enabled. Эта конфигурация SQL используется для управления поведением распространения ограничений во время оптимизации запроса.

Посмотрим описание этой конфигурации:

«Когда это правда, оптимизатор запросов выявляет и распространяет ограничения данных в плане запроса, чтобы оптимизировать их. Распространение ограничений иногда может быть затратным с вычислительной точки зрения для определенных видов планов запросов (например, с большим количеством предикатов и псевдонимов), что может отрицательно повлиять на общее время выполнения ».

Прежде чем запрос будет запущен, он проходит через несколько фраз. Оптимизация - это одна из фраз. Оптимизация запросов связана с повторным преобразованием исходного плана запроса, удалением ненужных столбцов, уменьшением объема данных, считываемых из хранилища данных, и т. Д.

Чтобы оптимизация работала правильно, нам нужно предоставить оптимизатору некоторую информацию. Ограничения - это инварианты, которые верны для всех строк, созданных оператором в SparkSQL.

Например, в запросе типа SELECT * FROM sales WHERE sales.price ›10000 мы можем гарантировать, что все строки из оператора Filter удовлетворяют price› 10000. Это так называемые ограничения.

Хотя ограничения полезны при оптимизации запросов, процесс получения полных ограничений (т. Е. Распространение ограничений) может потребовать больших затрат вычислений, если у вас очень большой запрос.

Нередко складывать много этапов машинного обучения в конвейер машинного обучения. Поскольку алгоритмы фактически выполняются операциями DataFrame / Dataset в Spark ML, для него используется одна и та же оптимизационная фраза.

Рассмотрим следующий конвейер машинного обучения:

import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}
import org.apache.spark.sql.internal.SQLConf
val df = (1 to 40).foldLeft(Seq((1, “foo”), (2, “bar”), (3, “baz”)).toDF(“id”, “x0”))((df, i) => df.withColumn(s”x$i”, $”x0"))
val indexers = df.columns.tail.map(c => new StringIndexer()
 .setInputCol(c)
 .setOutputCol(s”${c}_indexed”)
 .setHandleInvalid(“skip”))
val encoders = indexers.map(indexer => new OneHotEncoder()
 .setInputCol(indexer.getOutputCol)
 .setOutputCol(s”${indexer.getOutputCol}_encoded”)
 .setDropLast(true))
val stages: Array[PipelineStage] = indexers ++ encoders
val pipeline = new Pipeline().setStages(stages)
val startTime = System.nanoTime
pipeline.fit(df).transform(df).show
val runningTime = System.nanoTime — startTime

Чтобы закончить это раньше, нужно около получаса. Большую часть времени тратится локально на оптимизацию огромного плана запроса. Времени, необходимого для запуска алгоритма машинного обучения, на самом деле немного.

После добавления этой конфигурации spark.sql.constraintPropagation.enabled мы можем просто отключить распространение ограничений,

spark.conf.set(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key, false)

Это может сократить время работы указанного выше конвейера до менее чем полминуты.

В следующий раз, когда вы создадите конвейер машинного обучения и обнаружите, что производительность низкая, возможно, это просто потому, что оптимизация запроса требует слишком много времени. Вы можете попробовать отключить эту конфигурацию и посмотреть, улучшится ли ситуация.