Если еще в Spark Streaming

У меня есть приложение Spark Streaming, которое считывает данные из ОДНОЙ ТЕМЫ в Kafka, обрабатывает их и вставляет в 2 разных пространства ключей в Cassandra на основе содержимого элемента. Некоторые данные могут находиться в пространстве ключей A, а некоторые — в пространстве ключей B.

Я делаю это в настоящее время, используя операцию фильтра:

Functions.insertToCassandra(rdd.filter(element => element.tenant=="A"), keyspace = A, table = "tableName")
Functions.insertToCassandra(rdd.filter(element => element.tenant=="B"), keyspace = B, table = "tableName")

Таким образом, фильтр применяется к каждому rdd, те элементы, у которых есть поле клиента A, переходят в пространство ключей A, а элементы с полем владельца B переходят в пространство ключей B.

Есть ли более эффективный способ сделать это вместо применения операции фильтра 2 раза (особенно потому, что позже может быть более 2 пространств ключей)? Повысит ли производительность кэширование rdd перед операциями фильтрации?

Повторяю, у меня есть DStream, исходящий от Kafka, я его обрабатываю, а затем в операции foreachRDD у меня есть фрагмент кода сверху, который вставляет данные в Cassandra.

Спасибо


person Srdjan Nikitovic    schedule 18.03.2016    source источник


Ответы (1)


Прежде чем вы сделаете

Functions.insertToCassandra(rdd.filter(element => element.tenant=="A"), keyspace = A, "tableName")
Functions.insertToCassandra(rdd.filter(element=> element.tenant=="B"), keyspace = B, "tableName")

Обязательно выполните rdd.cache()

Когда вы делаете, как указано выше, ваша искра дважды пытается прочитать данные rdd. Spark никогда не сохраняет rdd в памяти, если вы не кэшируете или не транслируете его.

Другой способ может состоять в том, чтобы прочитать все данные сразу, кэшировать их, если набор данных не огромен. Затем используйте groupByKey, где key будет вашим пространством ключей (элементом) в этом случае.

person Abhishek Anand    schedule 18.03.2016
comment
Спасибо за Ваш ответ. Должен ли я добавить rdd.unpersist(true) после преобразования фильтра, чтобы освободить его из памяти? - person Srdjan Nikitovic; 18.03.2016
comment
Вы можете, но если у вас есть этот фрагмент кода внутри метода. Затем, как только вы выйдете за пределы этого метода, он автоматически удалит его из памяти. - person Abhishek Anand; 18.03.2016
comment
Также unpersist, если я правильно помню, сохраняет результат в памяти драйвера и удаляет его из рабочей памяти. уничтожить, с другой стороны, удаляет его отовсюду. - person Abhishek Anand; 18.03.2016
comment
На самом деле я понял, что намного лучше вариант с группировкой по ключу. Но эта операция дает мне (key, Iterable[T]). Как вставить эту итерируемую коллекцию в cassandra, когда cassandra требует RDD[T], и нет возможности переместить Iterable[T] в RDD[T]? Большое тебе спасибо - person Srdjan Nikitovic; 21.03.2016
comment
Ну, вы всегда можете использовать sc.parallelize(YourIterable.toList), но будьте очень осторожны, ваш итерируемый объект будет проходить через драйвер и, следовательно, легко достигнет spark.driver.maxResultSize или памяти драйвера. - person Abhishek Anand; 21.03.2016