Итерация по RDD Iterable в Scala

Итак, я новичок в Scala и только начинаю работать с RDD и функциональными операциями Scala.

Я пытаюсь перебрать значения моих парных RDD и вернуть Var1 со средним значением значений, хранящихся в Var2, применяя определенную функцию average, чтобы окончательный результат представлял собой уникальный список Var1 с одним AvgVar2, связанным с каждым. У меня много проблем с выяснением того, как перебирать значения.

* редактировать: у меня есть следующие объявления типов:

case class ID: Int,  Var1: Int, Var2: Int extends Serializable

У меня есть следующая функция:

  def foo(rdds: RDD[(ID, Iterable[(Var1, Var2)])]): RDD[(Var1, AvgVar2)] = {

    def average(as: Array[Var2]): AvgVar2 = {
       var sum = 0.0
       var i = 0.0
       while (i < as.length) {
           sum += Var2.val
           i += 1
      }
      sum/i
    }

    //My attempt at Scala
    rdds.map(x=> ((x._1),x._2)).groupByKey().map(x=>average(x._1)).collect()
}

Моя попытка Scala пытается сделать следующее:

  1. разделите пару RDD Iterable на пары ключ-значение Var1-Var2.
  2. Сгруппируйте по ключу Var1 и создайте массив связанных Var2.
  3. Примените мою функцию average к каждому массиву Var2
  4. Верните AvgVar2 со связанным Var1 в виде набора RDD.

*Редактировать:

Некоторые примеры входных данных для rdds:

//RDD[(ID,Iterable[(Var1,Var2)...])]
RDD[(1,[(1,3),(1,12),(1,6)])],
RDD[(2,[(2,5),(2,7)])]

Некоторые примеры выходных данных:

//RDD[(Var1, AvgVar2)]
RDD[(1,7),(2,6)]

*Редактировать: Строка рабочего кода Scala:

rdd.map(x => (x._2.map(it => it._1).asInstanceOf[Var1], average(x._2.map(it => it._2).toArray)))

person EliSquared    schedule 01.02.2019    source источник
comment
Есть ли какая-то конкретная причина, по которой вы решили использовать RDD вместо DataFrames?   -  person Pavel    schedule 01.02.2019
comment
Можете ли вы предоставить образец ввода и вывода? Может ли одно и то же значение Var1 встречаться в разных ID?   -  person vdep    schedule 01.02.2019
comment
@vdep, каждое уникальное значение Var1 может быть связано только с одним идентификатором, но я хочу удалить идентификатор в возвращаемом RDD. Я добавил образцы входных и выходных данных в свое редактирование, которые, я надеюсь, прояснят вопрос.   -  person EliSquared    schedule 01.02.2019


Ответы (1)


Учитывая ID = Var1, простой .map() решит его:

def foo(rdds: RDD[(Int, Iterable[(Int, Int)])]): RDD[(Int, Double)] = {

  def average(as: Iterable[(Int, Int)]): Double = {
    as.map(_._2).reduce(_+_)/as.size.toDouble
  }

  rdds.map(x => (x._1, average(x._2)))
}

Вывод:

val input = sc.parallelize(List((1,Iterable((1,3),(1,12),(1,6))), (2, Iterable((2,5),(2,7)))))

scala> foo(input).collect
res0: Array[(Int, Double)] = Array((1,7.0), (2,6.0))

EDITED: (average() с той же подписью):

def foo(rdds: RDD[(Int, Iterable[(Int, Int)])]): RDD[(Int, Double)] = {

  def average(as: Array[Int]): Double = {
    as.reduce(_+_)/as.size.toDouble
  }

  rdds.map(x => (x._1, average(x._2.map(tuple => tuple._2).toArray)))
}
person vdep    schedule 01.02.2019
comment
Я ценю ответ, но не могли бы вы обойтись без изменения формы функции average? Я хочу понять, как создать массив из различных Var2, чтобы я мог применить любую функцию сигнатуры average(as: Array[Var2]) к RDD в заданной форме. Кроме того, несмотря на то, что ID == Var1, это разные типы объектов, поэтому функция выдает ошибку. Я признаю, что это домашнее задание, поэтому мне нужно, чтобы типы точно совпадали. - person EliSquared; 01.02.2019
comment
@EliSquared отредактировал ответ. Я не уверен, что вы подразумеваете под: ID == Var1, это разные типы объектов, поэтому функция выдает ошибку - person vdep; 01.02.2019
comment
Я имею в виду, что ID и Var1 являются определенными классами (см. редактирование над функцией). Теперь единственная ошибка, с которой я сталкиваюсь, связана с x._1, который возвращает ID, когда мне нужно, чтобы он возвращал первый элемент Iterable (Var1). Сейчас я пытаюсь что-то вроде этого: rdds.map(x => (x._2.map(it=> it._1), average(x._2.map(tuple => tuple._2).toArray))), чтобы попытаться выбрать первый элемент итерации, но все равно получаю ошибку типа. - person EliSquared; 02.02.2019
comment
Я действительно заставил его работать, мне просто нужно было добавить: .asInstanceOf[Var1] после (x._2.map(it=> it._1) => (x._2.map(it=> it._1).asInstanceOf[Var1]. См. полностью работающую строку кода выше. При этом, если вы можете оптимизировать эффективность или удобочитаемость, мне было бы интересно. - person EliSquared; 02.02.2019