У меня есть приложение Spark Streaming, которое считывает данные из ОДНОЙ ТЕМЫ в Kafka, обрабатывает их и вставляет в 2 разных пространства ключей в Cassandra на основе содержимого элемента. Некоторые данные могут находиться в пространстве ключей A, а некоторые — в пространстве ключей B.
Я делаю это в настоящее время, используя операцию фильтра:
Functions.insertToCassandra(rdd.filter(element => element.tenant=="A"), keyspace = A, table = "tableName")
Functions.insertToCassandra(rdd.filter(element => element.tenant=="B"), keyspace = B, table = "tableName")
Таким образом, фильтр применяется к каждому rdd, те элементы, у которых есть поле клиента A, переходят в пространство ключей A, а элементы с полем владельца B переходят в пространство ключей B.
Есть ли более эффективный способ сделать это вместо применения операции фильтра 2 раза (особенно потому, что позже может быть более 2 пространств ключей)? Повысит ли производительность кэширование rdd перед операциями фильтрации?
Повторяю, у меня есть DStream, исходящий от Kafka, я его обрабатываю, а затем в операции foreachRDD у меня есть фрагмент кода сверху, который вставляет данные в Cassandra.
Спасибо