как хранить сгруппированные данные в json в pyspark

Я новичок в писпарке

У меня есть набор данных, который выглядит (просто снимок нескольких столбцов)

описание данных

Я хочу сгруппировать свои данные по ключу. Мой ключ

CONCAT(a.div_nbr,a.cust_nbr)

Моя конечная цель - преобразовать данные в JSON, отформатированные следующим образом.

k1[{v1,v2,....},{v1,v2,....}], k2[{v1,v2,....},{v1,v2,....}],....

e.g

248138339 [{ PRECIMA_ID:SCP 00248 0000138339, PROD_NBR:5553505, PROD_DESC:Shot and a Beer Battered Onion Rings (5553505 and 9285840) , PROD_BRND:Molly's Kitchen,PACK_SIZE:4/2.5 LB, QTY_UOM:CA } , 
        { PRECIMA_ID:SCP 00248 0000138339 , PROD_NBR:6659079 , PROD_DESC:Beef Chuck Short Rib Slices, PROD_BRND:Stockyards , PACK_SIZE:12 LBA , QTY_UOM:CA} ,{...,...,} ],

1384611034793[{},{},{}],....

Я создал фрейм данных (в основном я присоединяюсь к двум таблицам, чтобы получить еще несколько полей)

joinstmt = sqlContext.sql(
          "SELECT a.precima_id , CONCAT(a.div_nbr,a.cust_nbr) as
                  key,a.prod_nbr , a.prod_desc,a.prod_brnd ,      a.pack_size , a.qty_uom , a.sales_opp , a.prc_guidance , a.pim_mrch_ctgry_desc , a.pim_mrch_ctgry_id , b.start_date,b.end_date 

ОТ scoop_dtl a присоединиться к scoop_hdr b on (a.precima_id =b.precima_id)")

Теперь, чтобы получить вышеуказанный результат, мне нужно сгруппировать по результату на основе ключа, я сделал следующее

groupbydf = joinstmt.groupBy("key")

Это привело к сгруппированным данным, и после прочтения я узнал, что не могу использовать их напрямую, и мне нужно преобразовать их обратно в кадры данных, чтобы сохранить их.

Я новичок в этом, мне нужна помощь, чтобы преобразовать его обратно в фреймы данных, или я был бы признателен, если есть какие-либо другие способы.


person jeetu    schedule 27.02.2016    source источник


Ответы (2)


Если ваш присоединенный фрейм данных выглядит так:

gender  age
M   5
F   50
M   10
M   10
F   10

Затем вы можете использовать код ниже, чтобы получить желаемый результат

joinedDF.groupBy("gender") \ 
    .agg(collect_list("age").alias("ages")) \
    .write.json("jsonOutput.txt")

Вывод будет выглядеть следующим образом:

{"gender":"F","ages":[50,10]}
{"gender":"M","ages":[5,10,10]}

Если у вас есть несколько столбцов, таких как имя, зарплата. Вы можете добавить столбцы, как показано ниже:

df.groupBy("gender")
    .agg(collect_list("age").alias("ages"),collect_list("name").alias("names"))

Ваш вывод будет выглядеть так:

{"gender":"F","ages":[50,10],"names":["ankit","abhay"]}
{"gender":"M","ages":[5,10,10],"names":["snchit","mohit","rohit"]}
person Sanchit Grover    schedule 12.01.2018
comment
спасибо - на вопрос оператора, как мы можем расширить ваше решение для данных с большим количеством полей? Например. если joinDF содержит [{'gender': 'M', 'name': 'kelly', 'age': 20}, {'gender': M, 'name': 'bob', 'age': 41}] , то при группировке по 'полу' получаем: {'пол': 'М', 'имена': ['келли', 'боб'], 'возраст': [20, 41]} - person Quetzalcoatl; 12.01.2018
comment
Обновил мой ответ. надеюсь, это поможет. - person Sanchit Grover; 16.01.2018
comment
Но упорядочены ли собранные элементы списка? например 50-летний возраст соответствует анкиту, а 10-летний возраст соответствует абхаю в ваших примерах? - person user238607; 02.08.2018

Вы не можете использовать GroupedData напрямую. Сначала его нужно собрать. Это может быть частично покрыто агрегированием со встроенными функциями, такими как collect_list, но просто невозможно достичь желаемого результата со значениями, используемыми для представления ключей, с использованием DataFrameWriter.

Вместо этого можно попробовать что-то вроде этого:

from pyspark.sql import Row
import json

def make_json(kvs):
  k, vs = kvs
  return json.dumps({k[0]: list(vs)})

(df.select(struct(*keys), values)
    .rdd
    .mapValues(Row.asDict)
    .groupByKey()
    .map(make_json))

и saveAsTextFile.

person zero323    schedule 27.02.2016
comment
Вопрос для уточнения: чему соответствуют переменные kvs, *keys и values ​​в примере OP? - person Quetzalcoatl; 12.01.2018