Spark SQL + Cassandra: плохая производительность

Я только начинаю использовать Spark SQL + Cassandra и, возможно, упускаю что-то важное, но один простой запрос занимает ~ 45 секунд. Я использую библиотеку cassanda-spark-connector и запускаю локальный веб-сервер, на котором также размещается Spark. Итак, моя установка примерно такая:

В сбт:

    "org.apache.spark" %% "spark-core" % "1.4.1" excludeAll(ExclusionRule(organization = "org.slf4j")),
    "org.apache.spark" %% "spark-sql" % "1.4.1" excludeAll(ExclusionRule(organization = "org.slf4j")),
    "com.datastax.spark" %% "spark-cassandra-connector" % "1.4.0-M3" excludeAll(ExclusionRule(organization = "org.slf4j"))

В коде у меня есть синглтон, в котором размещены SparkContext и CassandraSQLContetx. Затем он вызывается из сервлета. Вот как выглядит код синглтона:

object SparkModel {

  val conf =
    new SparkConf()
      .setAppName("core")
      .setMaster("local")
      .set("spark.cassandra.connection.host", "127.0.0.1")

  val sc = new SparkContext(conf)
  val sqlC = new CassandraSQLContext(sc)
  sqlC.setKeyspace("core")

  val df: DataFrame = sqlC.cassandraSql(
    "SELECT email, target_entity_id, target_entity_type " +
    "FROM tracking_events " +
    "LEFT JOIN customers " +
    "WHERE entity_type = 'User' AND entity_id = customer_id")
}

А вот как я его использую:

get("/spark") {
  SparkModel.df.collect().map(r => TrackingEvent(r.getString(0), r.getString(1), r.getString(2))).toList
}

Cassandra, Spark и веб-приложение работают на одном хосте в виртуальной машине на моем Macbook Pro с приличными характеристиками. Запросы Cassandra сами по себе занимают 10-20 миллисекунд.

Когда я вызываю эту конечную точку в первый раз, для возврата результата требуется 70-80 секунд. Последующие запросы занимают ~45 секунд. Лог последующей операции выглядит так:

12:48:50 INFO  org.apache.spark.SparkContext - Starting job: collect at V1Servlet.scala:1146
12:48:50 INFO  o.a.spark.scheduler.DAGScheduler - Got job 1 (collect at V1Servlet.scala:1146) with 1 output partitions (allowLocal=false)
12:48:50 INFO  o.a.spark.scheduler.DAGScheduler - Final stage: ResultStage 1(collect at V1Servlet.scala:1146)
12:48:50 INFO  o.a.spark.scheduler.DAGScheduler - Parents of final stage: List()
12:48:50 INFO  o.a.spark.scheduler.DAGScheduler - Missing parents: List()
12:48:50 INFO  o.a.spark.scheduler.DAGScheduler - Submitting ResultStage 1 (MapPartitionsRDD[29] at collect at V1Servlet.scala:1146), which has no missing parents
12:48:50 INFO  org.apache.spark.storage.MemoryStore - ensureFreeSpace(18696) called with curMem=26661, maxMem=825564856
12:48:50 INFO  org.apache.spark.storage.MemoryStore - Block broadcast_1 stored as values in memory (estimated size 18.3 KB, free 787.3 MB)
12:48:50 INFO  org.apache.spark.storage.MemoryStore - ensureFreeSpace(8345) called with curMem=45357, maxMem=825564856
12:48:50 INFO  org.apache.spark.storage.MemoryStore - Block broadcast_1_piece0 stored as bytes in memory (estimated size 8.1 KB, free 787.3 MB)
12:48:50 INFO  o.a.spark.storage.BlockManagerInfo - Added broadcast_1_piece0 in memory on localhost:56289 (size: 8.1 KB, free: 787.3 MB)
12:48:50 INFO  org.apache.spark.SparkContext - Created broadcast 1 from broadcast at DAGScheduler.scala:874
12:48:50 INFO  o.a.spark.scheduler.DAGScheduler - Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[29] at collect at V1Servlet.scala:1146)
12:48:50 INFO  o.a.s.scheduler.TaskSchedulerImpl - Adding task set 1.0 with 1 tasks
12:48:50 INFO  o.a.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 1.0 (TID 1, localhost, NODE_LOCAL, 59413 bytes)
12:48:50 INFO  org.apache.spark.executor.Executor - Running task 0.0 in stage 1.0 (TID 1)
12:48:50 INFO  com.datastax.driver.core.Cluster - New Cassandra host localhost/127.0.0.1:9042 added
12:48:50 INFO  c.d.s.c.cql.CassandraConnector - Connected to Cassandra cluster: Super Cluster
12:49:11 INFO  o.a.spark.storage.BlockManagerInfo - Removed broadcast_0_piece0 on localhost:56289 in memory (size: 8.0 KB, free: 787.3 MB)
12:49:35 INFO  org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 (TID 1). 6124 bytes result sent to driver
12:49:35 INFO  o.a.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 1.0 (TID 1) in 45199 ms on localhost (1/1)
12:49:35 INFO  o.a.s.scheduler.TaskSchedulerImpl - Removed TaskSet 1.0, whose tasks have all completed, from pool 
12:49:35 INFO  o.a.spark.scheduler.DAGScheduler - ResultStage 1 (collect at V1Servlet.scala:1146) finished in 45.199 s

Как видно из лога, самые длинные паузы между этими 3 строками (21 + 24 секунды):

12:48:50 INFO  c.d.s.c.cql.CassandraConnector - Connected to Cassandra cluster: Super Cluster
12:49:11 INFO  o.a.spark.storage.BlockManagerInfo - Removed broadcast_0_piece0 on localhost:56289 in memory (size: 8.0 KB, free: 787.3 MB)
12:49:35 INFO  org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 (TID 1). 6124 bytes result sent to driver

Видимо, я что-то делаю не так. Что это? Как я могу улучшить это?

EDIT: Важное дополнение: размер таблиц крошечный (~200 записей для tracking_events, ~20 для customers), поэтому чтение их целиком в память не должно занимать много времени. И это локальная установка Cassandra, без кластера и сети.


person Haspemulator    schedule 17.08.2015    source источник


Ответы (1)


  "SELECT email, target_entity_id, target_entity_type " +
    "FROM tracking_events " +
    "LEFT JOIN customers " +
    "WHERE entity_type = 'User' AND entity_id = customer_id")

Этот запрос будет считывать все данные как из таблицы tracking_events, так и из таблицы customers. Я бы сравнил производительность с выполнением SELECT COUNT(*) для обеих таблиц. Если они значительно отличаются, то может возникнуть проблема, но я предполагаю, что это просто количество времени, необходимое для полного чтения обеих таблиц в памяти.

Есть несколько ручек для настройки того, как выполняется чтение, и, поскольку значения по умолчанию ориентированы на гораздо больший набор данных, вы можете изменить их.

spark.cassandra.input.split.size_in_mb  approx amount of data to be fetched into a Spark partition  64 MB
spark.cassandra.input.fetch.size_in_rows    number of CQL rows fetched per driver request   1000

Я хотел бы убедиться, что вы генерируете столько задач, сколько у вас есть ядер (как минимум), чтобы вы могли использовать все свои ресурсы. Для этого уменьшите input.split.size

Размер выборки определяет, сколько строк одновременно выгружается ядром исполнителя, поэтому его увеличение может увеличить скорость в некоторых случаях использования.

person RussS    schedule 17.08.2015
comment
Отличный ответ Расс! Я заметил такое же снижение производительности, но предположил, что это произошло из-за того, что мой кластер Spark работал на моей локальной виртуальной машине. - person Aaron; 17.08.2015
comment
По какой-то причине я не могу запустить свой экземпляр Cassandra прямо сейчас, но важно то, что эти 2 таблицы крошечные. tracking_events имеет 200 записей, а customers всего около 20. Это не может занять так много времени из-за загрузки данных. - person Haspemulator; 17.08.2015
comment
Почему бы вам не проверить пользовательский интерфейс, он должен разбить тайминг именно для вас. - person RussS; 17.08.2015
comment
Я не знаю, как это сделать. Я не запускаю автономный Spark, а только как зависимость от своего веб-приложения. Я пытался зайти на localhost:4040, как задокументировано, но там ничего нет. - person Haspemulator; 17.08.2015
comment
Хорошо, удалось запустить Кассандру. С таким запросом, как SELECT Count(*) FROM tracking_events, обращение туда и обратно занимает ~2 секунды, намного быстрее. - person Haspemulator; 17.08.2015
comment
Не тот запрос, который вам нужен для запуска кода через spark. Сравните производительность с: val rdd1 =sc.cassandraTable(ks,tracking_events) val rdd2 = sc.cassandraTable(ks,customers) rdd1.count rdd2.count // Также должно быть перетасовкой здесь, но мы можем игнорировать это сейчас - person RussS; 17.08.2015
comment
Пользовательский интерфейс работает только во время работы приложения, если вы не сохраняете журнал событий с помощью spark.eventLog.enabled = true + spark.eventLog.dir = some dir. Дополнительные сведения см. в документации искры. - person RussS; 17.08.2015
comment
Работа sc.cassandraTable("core", "tracking_events").count + sc.cassandraTable("core", "customers").count занимает 2,5 секунды. - person Haspemulator; 17.08.2015
comment
Тогда кажется, что перераспределение Spark будет медленной частью. Мы мало что можем сделать, чтобы исправить это, кроме как свести к минимуму ваш параллелизм по умолчанию (поскольку задание такое маленькое). Если бы вы могли видеть пользовательский интерфейс, это могло бы дать вам представление о том, сколько задач создается во время события перераспределения. Вы, вероятно, хотите только 1 для такой небольшой работы. Таким образом, параллелизм перемешивания по умолчанию равен 1. spark.default.parallelism является параметром conf для этого. - person RussS; 17.08.2015
comment
Установка spark.default.parallelism на 1 ничего не изменила. И мне удалось запустить пользовательский интерфейс и добавить скриншот со сводкой задания. imgur.com/Y05r7FX - person Haspemulator; 17.08.2015
comment
Поскольку есть только одна задача, я боюсь, что это конец пути, насколько я вижу. По какой-то причине это декартово произведение должно быть очень медленным ... Я не могу представить, с чем здесь можно по-настоящему возиться. С другой стороны, я предполагаю, что большая часть этого является накладной и, вероятно, не изменится, если вы значительно увеличите объем данных. - person RussS; 17.08.2015
comment
Это нормально, но, думаю, меня совершенно сбила с толку применимость Spark к запросам в реальном времени. Это на самом деле очень печально. Мне нужно что-то, что может ответить менее чем за 1 секунду. - person Haspemulator; 18.08.2015
comment
Spark на самом деле не предназначен для запросов в реальном времени, это скорее среда пакетной аналитики, может быть, вы хотели найти что-то вроде Solr?/ElasticSerach? - person RussS; 18.08.2015
comment
для определения размера в spark.cassandra.input.split.size_in_mb посетите [ссылка] (stackoverflow.com/a/31586690/5035204) - person karmadip dodiya; 19.09.2015