Допустим, у меня есть подобная структура данных, где ts - это временная метка.
case class Record(ts: Long, id: Int, value: Int)
Учитывая большое количество этих записей, я хочу получить запись с самой высокой отметкой времени для каждого идентификатора. Используя API RDD, я думаю, что следующий код выполняет свою работу:
def findLatest(records: RDD[Record])(implicit spark: SparkSession) = {
records.keyBy(_.id).reduceByKey{
(x, y) => if(x.ts > y.ts) x else y
}.values
}
Точно так же это моя попытка с наборами данных:
def findLatest(records: Dataset[Record])(implicit spark: SparkSession) = {
records.groupByKey(_.id).mapGroups{
case(id, records) => {
records.reduceLeft((x,y) => if (x.ts > y.ts) x else y)
}
}
}
Я пытаюсь понять, как добиться чего-то подобного с фреймами данных, но безрезультатно. Я понимаю, что могу выполнить группировку с помощью:
records.groupBy($"id")
Но это дает мне RelationGroupedDataSet, и мне не ясно, какую функцию агрегации мне нужно написать, чтобы достичь того, чего я хочу — все примеры агрегаций, которые я видел, сосредоточены на возврате только одного агрегируемого столбца, а не всей строки.
Можно ли добиться этого с помощью фреймов данных?