Сортировка RDD после группировки и суммирования

Я пытаюсь провести некоторый анализ некоторых данных о визге. Данные структурированы следующим образом:

>>> yelp_df.printSchema()
root
 |-- business_id: string (nullable = true)
 |-- cool: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- stars: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- type: string (nullable = true)
 |-- useful: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- full_address: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- neighborhoods: string (nullable = true)
 |-- open: boolean (nullable = true)
 |-- review_count: integer (nullable = true)
 |-- state: string (nullable = true)

Я хочу подсчитать записи для каждого состояния для всего с 10 или более обзорами, которые в настоящее время открыты, и найти состояние с третьим по величине количеством. Сначала я сделал

>>> revDF = yelp_df.filter(yelp_df.review_count > 9)
>>> openDF = revDF.filter(revDF.open == True)
>>> openDF.groupBy("state").agg({"review_count":"sum"}).collect()

что дает это

[Row(state=u'MN', SUM(review_count#16)=3470), Row(state=u'GA', SUM(review_count#16)=5764), Row(state=u'TX', SUM(review_count#16)=1778), Row(state=u'AZ', SUM(review_count#16)=72214), Row(state=u'NY', SUM(review_count#16)=4081), Row(state=u'OR', SUM(review_count#16)=2125), Row(state=u'ID', SUM(review_count#16)=429), Row(state=u'CA', SUM(review_count#16)=1876), Row(state=u'CO', SUM(review_count#16)=6720), Row(state=u'WA', SUM(review_count#16)=525), Row(state=u'LA', SUM(review_count#16)=8394)]

теперь, после сохранения в summedDF,

summedDF.sort(summedDF.state.desc()).collect()

сортирует его по состоянию просто отлично, но (неудивительно)

summedDF.sort(summedDF.SUM(review_count#16).desc()).collect()

не работает. На самом деле он даже не запускается. У меня есть правильное количество скобок, но вместо выполнения он переходит на следующую строку с ... раньше, ожидая нового ввода.

Как мне это сделать и что происходит с невыполнением? а что с №16?


person hedgedandlevered    schedule 04.05.2016    source источник


Ответы (1)


Изменить: добавлена ​​версия для pyspark.

Я предлагаю вам преобразовать ваш код во что-то вроде:

val finalDF = yelp_df
  .where(col("review_count") > 9 && col("open") === true)
  .groupBy("state")
  .agg(sum("review_count").as("sum_column"))
  .sort(col("sum_column").desc)

Возможно, мы сможем адаптироваться для pyspark:

from pyspark.sql.functions import *
finalDF = yelp_df \
    .where((col("review_count") > 9) & (col("open") == True)) \
    .groupBy("state") \
    .agg(col("state"), sum(col("review_count")).alias("sum_column")) \
    .sort(col("sum_column").desc())

Теперь к вашему вопросу:

что происходит с невыполнением? а что с №16?

Короче говоря, ваша попытка сослаться на столбец с помощью summedDF.SUM(review_count#16) не сработала.

Функция sort использует либо объекты Column (которые можно создать вызовом col("name")), либо непосредственно имя столбца. Однако, когда вы выполняли агрегирование, вы не выбрали имя для нового столбца, представляющего сумму, поэтому ссылаться на него позже будет немного сложно. Чтобы решить эту проблему, я использовал .as("sum_column") в четвертой строке.

person Daniel de Paula    schedule 04.05.2016
comment
извините, это pyspark, а не просто искра. Следовательно, >>> Таким образом, val среди других команд здесь не распознается - person hedgedandlevered; 04.05.2016
comment
Прошу прощения, не заметил этой детали. Тем не менее, логика должна быть той же. Я считаю, что те же функции доступны для python. - person Daniel de Paula; 04.05.2016
comment
@hedgedandlevered Я попытался вставить адаптированную версию для pyspark, дайте мне знать, если она работает. Извините, но я не очень привык к pyspark. - person Daniel de Paula; 04.05.2016
comment
Я запускаю свой сэссоин из командной строки вот так pyspark --packages com.databricks:spark-csv_2.11:1.4.0. Это писпарк, верно? Я не получаю ваш код для выполнения. Он снова делает ..., как будто он не завершен. я пытаюсь finalDF = yelp_df.where("review_count > 9 AND open = true").groupBy("state").agg(F.sum(col("review_count").alias("sum_column")).sort(col("sum_column").desc) - person hedgedandlevered; 04.05.2016
comment
Вы вставляете новую строку между вызовами? Если это так, вы должны закончить строку с \ - person Daniel de Paula; 04.05.2016
comment
Я нет. Кроме того, это приведет к преждевременному выполнению, а не к проблеме .... - person hedgedandlevered; 04.05.2016
comment
Давайте продолжим обсуждение в чате. - person Daniel de Paula; 04.05.2016
comment
идеально!____________ - person hedgedandlevered; 04.05.2016