Pyspark: вычисление суммы двух соответствующих столбцов на основе условий двух столбцов в двух СДР.

У меня есть два RDD с одинаковыми столбцами:
rdd1 :-

+-----------------+
|mid|uid|frequency|
+-----------------+
| m1| u1|        1|
| m1| u2|        1|
| m2| u1|        2|
+-----------------+

рдд2 :-

+-----------------+
|mid|uid|frequency|
+-----------------+
| m1| u1|       10|
| m2| u1|       98|
| m3| u2|       21|
+-----------------+

Я хочу рассчитать сумму frequencies на основе mid и uid. Результат должен быть примерно таким:

+-----------------+
|mid|uid|frequency|
+-----------------+
| m1| u1|       11|
| m2| u1|      100|
| m3| u2|       21|
+-----------------+

Заранее спасибо.

РЕДАКТИРОВАТЬ: я также достиг решения таким образом (используя map-reduce):

from pyspark.sql.functions import col

data1 = [("m1","u1",1),("m1","u2",1),("m2","u1",2)]
data2 = [("m1","u1",10),("m2","u1",98),("m3","u2",21)]
df1 = sqlContext.createDataFrame(data1,['mid','uid','frequency'])
df2 = sqlContext.createDataFrame(data2,['mid','uid','frequency'])

df3 = df1.unionAll(df2)
df4 = df3.map(lambda bbb: ((bbb['mid'], bbb['uid']), int(bbb['frequency'])))\
             .reduceByKey(lambda a, b: a+b)

p = df4.map(lambda p: (p[0][0], p[0][1], p[1])).toDF()

p = p.select(col("_1").alias("mid"), \
             col("_2").alias("uid"), \
             col("_3").alias("frequency"))

p.show()

Выход:

+---+---+---------+
|mid|uid|frequency|
+---+---+---------+
| m2| u1|      100|
| m1| u1|       11|
| m1| u2|        1|
| m3| u2|       21|
+---+---+---------+

person rootcss    schedule 16.04.2016    source источник
comment
Вы можете написать код на Python, чтобы решить эту проблему. Если вы уже пробовали это, вам следует отредактировать вопрос и добавить свой код.   -  person Håken Lid    schedule 16.04.2016
comment
вы пропустили группу в ожидаемом результате   -  person eliasah    schedule 16.04.2016
comment
@HåkenLid Обычно мы можем сделать это на python, используя pandas esp. Но мне нужна была конкретная помощь pyspark.   -  person rootcss    schedule 17.04.2016


Ответы (2)


Вам просто нужно выполнить группу по середине и uid и выполнить операцию суммы:

data1 = [("m1","u1",1),("m1","u2",1),("m2","u1",2)]
data2 = [("m1","u1",10),("m2","u1",98),("m3","u2",21)]
df1 = sqlContext.createDataFrame(data1,['mid','uid','frequency'])
df2 = sqlContext.createDataFrame(data2,['mid','uid','frequency'])

df3 = df1.unionAll(df2)

df4 = df3.groupBy(df3.mid,df3.uid).sum() \
         .withColumnRenamed("sum(frequency)","frequency")

df4.show()

# +---+---+---------+
# |mid|uid|frequency|
# +---+---+---------+
# | m1| u1|       11|
# | m1| u2|        1|
# | m2| u1|      100|
# | m3| u2|       21|
# +---+---+---------+
person eliasah    schedule 16.04.2016

Я также достиг решения таким образом (используя map-reduce):

from pyspark.sql.functions import col

data1 = [("m1","u1",1),("m1","u2",1),("m2","u1",2)]
data2 = [("m1","u1",10),("m2","u1",98),("m3","u2",21)]
df1 = sqlContext.createDataFrame(data1,['mid','uid','frequency'])
df2 = sqlContext.createDataFrame(data2,['mid','uid','frequency'])

df3 = df1.unionAll(df2)
df4 = df3.map(lambda bbb: ((bbb['mid'], bbb['uid']), int(bbb['frequency'])))\
             .reduceByKey(lambda a, b: a+b)

p = df4.map(lambda p: (p[0][0], p[0][1], p[1])).toDF()

p = p.select(col("_1").alias("mid"), \
             col("_2").alias("uid"), \
             col("_3").alias("frequency"))

p.show()

Выход:

+---+---+---------+
|mid|uid|frequency|
+---+---+---------+
| m2| u1|      100|
| m1| u1|       11|
| m1| u2|        1|
| m3| u2|       21|
+---+---+---------+
person rootcss    schedule 12.05.2016
comment
Единственная проблема с этим решением заключается в том, что вы теряете всю оптимизацию, выполненную проектом tungsten за DataFrames. stackoverflow.com/questions/31780677/ - person eliasah; 04.04.2017