Можно ли использовать reduceBykey для изменения типа и объединения значений — Scala Spark?

В коде ниже я пытаюсь объединить значения:

val rdd: org.apache.spark.rdd.RDD[((String), Double)] =
    sc.parallelize(List(
      (("a"), 1.0),
      (("a"), 3.0),
      (("a"), 2.0)
      ))

val reduceByKey = rdd.reduceByKey((a , b) => String.valueOf(a) + String.valueOf(b))

reduceByValue должен содержать (a, 1,3,2), но получить ошибку времени компиляции:

Multiple markers at this line - type mismatch; found : String required: Double - type mismatch; found : String 
 required: Double

Что определяет тип функции сокращения? Тип не может быть преобразован?

Я мог бы использовать groupByKey для достижения того же результата, но просто хочу понять reduceByKey.


person blue-sky    schedule 17.12.2014    source источник


Ответы (2)


Нет, учитывая rdd типа RDD[(K,V)], reduceByKey примет ассоциативную функцию типа (V,V) => V.

Если мы хотим применить сокращение, которое изменяет тип значений на другой произвольный тип, мы можем использовать aggregateByKey:

def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)

Используя функции zeroValue и seqOp, он обеспечивает операцию, подобную сворачиванию, на стороне карты, в то время как ассоциированная функция combOp объединяет результаты seqOp в окончательный результат, как это сделал бы reduceByKey. Как мы можем понять из подписи, в то время как значения коллекции имеют тип V, результат aggregateByKey будет иметь произвольный тип U

Применительно к приведенному выше примеру aggregateByKey будет выглядеть так:

rdd.aggregateByKey("")({case (aggr , value) => aggr + String.valueOf(value)}, (aggr1, aggr2) => aggr1 + aggr2)
person maasg    schedule 17.12.2014

Проблема с вашим кодом заключается в том, что ваш тип значения не соответствует. Вы можете добиться того же результата с помощью reduceByKey, если вы изменили тип значения в своем RDD.

val rdd: org.apache.spark.rdd.RDD[((String), String)] =
    sc.parallelize(List(
      ("a", "1.0"),
      ("a", "3.0"),
      ("a", "2.0")
      ))

    val reduceByKey = rdd.reduceByKey((a , b) => a.concat(b))

Вот тот же пример. Пока функция, которую вы передаете в reduceByKey, принимает два параметра типа Value (в вашем случае Double) и возвращает один параметр того же типа, ваш reduceByKey будет работать.

person Community    schedule 28.01.2015