Я пишу набор методов, расширяющих API Spark RDD. Мне нужно реализовать общий метод хранения RDD, и для начала я попытался обернуть saveAsCassandraTable
spark-cassandra-connector, но безуспешно.
Вот часть «расширение API RDD»:
object NewRDDFunctions {
implicit def addStorageFunctions[T](rdd: RDD[T]):
RDDStorageFunctions[T] = new RDDStorageFunctions(rdd)
}
class RDDStorageFunctions[T](rdd: RDD[T]) {
def saveResultsToCassandra() {
rdd.saveAsCassandraTable("ks_name", "table_name") // this line produces errors!
}
}
...и импортировать объект как: import ...NewRDDFunctions._
.
Отмеченная строка выдает следующие ошибки:
Error:(99, 29) could not find implicit value for parameter rwf: com.datastax.spark.connector.writer.RowWriterFactory[T]
rdd.saveAsCassandraTable("ks_name", "table_name")
^
Error:(99, 29) not enough arguments for method saveAsCassandraTable: (implicit connector: com.datastax.spark.connector.cql.CassandraConnector, implicit rwf: com.datastax.spark.connector.writer.RowWriterFactory[T], implicit columnMapper: com.datastax.spark.connector.mapper.ColumnMapper[T])Unit.
Unspecified value parameters rwf, columnMapper.
rdd.saveAsCassandraTable("ks_name", "table_name")
^
Я не понимаю, почему это не работает, поскольку saveAsCassandraTable
предназначен для работы с любым RDD. Какие-либо предложения?
У меня была аналогичная проблема с пример в документации spark-cassandra-connector:
case class WordCount(word: String, count: Long)
val collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60)))
collection.saveAsCassandraTable("test", "words_new", SomeColumns("word", "count"))
... и решение состояло в том, чтобы переместить определение класса случая из "основной" функции (но я действительно не знаю, относится ли это к упомянутой проблеме...).