Метод обертывания Scala для параметризованного класса (spark-cassandra-connector)

Я пишу набор методов, расширяющих API Spark RDD. Мне нужно реализовать общий метод хранения RDD, и для начала я попытался обернуть saveAsCassandraTable spark-cassandra-connector, но безуспешно.

Вот часть «расширение API RDD»:

object NewRDDFunctions {
  implicit def addStorageFunctions[T](rdd: RDD[T]):
  RDDStorageFunctions[T] = new RDDStorageFunctions(rdd)
}

class RDDStorageFunctions[T](rdd: RDD[T]) {
  def saveResultsToCassandra() {
    rdd.saveAsCassandraTable("ks_name", "table_name")    // this line produces errors!
  }
}

...и импортировать объект как: import ...NewRDDFunctions._.

Отмеченная строка выдает следующие ошибки:

Error:(99, 29) could not find implicit value for parameter rwf: com.datastax.spark.connector.writer.RowWriterFactory[T]
    rdd.saveAsCassandraTable("ks_name", "table_name")
                            ^

Error:(99, 29) not enough arguments for method saveAsCassandraTable: (implicit connector: com.datastax.spark.connector.cql.CassandraConnector, implicit rwf: com.datastax.spark.connector.writer.RowWriterFactory[T], implicit columnMapper: com.datastax.spark.connector.mapper.ColumnMapper[T])Unit.
Unspecified value parameters rwf, columnMapper.
    rdd.saveAsCassandraTable("ks_name", "table_name")
                            ^

Я не понимаю, почему это не работает, поскольку saveAsCassandraTable предназначен для работы с любым RDD. Какие-либо предложения?


У меня была аналогичная проблема с пример в документации spark-cassandra-connector:

case class WordCount(word: String, count: Long)
val collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60)))
collection.saveAsCassandraTable("test", "words_new", SomeColumns("word", "count"))

... и решение состояло в том, чтобы переместить определение класса случая из "основной" функции (но я действительно не знаю, относится ли это к упомянутой проблеме...).


person bmscicho    schedule 27.06.2015    source источник


Ответы (1)


saveAsCassandraTable требует 3 неявных параметра. Первый (connector) имеет значение по умолчанию, последние два (rwf и columnMapper) не находятся в неявной области видимости в вашем методе saveResultsToCassandra, поэтому ваш метод не компилируется.

Посмотрите на этот ответ на другой вопрос, если вам нужна дополнительная информация о имплицитах.

Превращение вашего saveResultsToCassandra в функцию ниже должно работать, если вы ранее определили свои таблицы (TableDef).

def saveResultsToCassandra()(
  // implicit parameters as a separate list!
  implicit rwf: RowWriterFactory[T], 
  columnMapper: ColumnMapper[T]
) {
  rdd.saveAsCassandraTable("ks_name", "table_name")
}
person Peter Neyens    schedule 27.06.2015
comment
Эта часть кода работает, но когда я использую свой метод saveResultsToCassandra для RDD, он вызывает: Unspecified value parameters rwf, columnMapper. Почему это не вызывает никаких ошибок, когда я использую saveAsCassandraTable("ks_name", "table_name") напрямую (без моей оболочки)? Я не хочу определять какие-либо таблицы, так как saveAsCassandraTable сделает это за меня. Я посмотрел на предоставленный вами ответ, но я не знаю, как он применим в моем случае. - person bmscicho; 27.06.2015
comment
Мне пришлось добавить дополнительные скобки прямо перед (implicit rwf: RowWriterFactory[T], columnMapper: ColumnMapper[T]), спасибо! - person bmscicho; 27.06.2015