Постройте иерархию из набора реляционных данных с помощью Pyspark

Я новичок в Python и застрял в построении иерархии на основе набора реляционных данных.
Было бы очень полезно, если бы у кого-то была идея, как с этим поступить.

У меня есть реляционный набор данных с такими данными, как

_currentnode,  childnode_  
 root,         child1  
 child1,       leaf2  
 child1,       child3  
 child1,       leaf4  
 child3,       leaf5  
 child3,       leaf6  

скоро. Я ищу код python или pyspark для
построения фрейма данных иерархии, как показано ниже

_level1, level2,  level3,  level4_  
root,    child1,  leaf2,   null  
root,    child1,  child3,  leaf5  
root,    child1,  child3,  leaf6  
root,    child1,  leaf4,   null  

Данные представляют собой буквенно-цифровые символы и представляют собой огромный набор данных [~ 50 миллионов записей].
Кроме того, корень иерархии известен и может быть встроен в код.
Итак, в приведенном выше примере корень иерархии - «корень».


person Vardhan    schedule 18.06.2020    source источник


Ответы (1)


Кратчайший путь с Pyspark

Входные данные можно интерпретировать как график со связями между currentnode и childnode. Тогда возникает вопрос: каков кратчайший путь между корневым узлом и всеми конечными узлами, и он называется кратчайший путь из одного источника.

В Spark есть Graphx для параллельных вычислений графиков. К сожалению, GraphX ​​не предоставляет Python API (более подробную информацию можно найти здесь). Библиотека графов с поддержкой Python - это GraphFrames. GraphFrames использует части GraphX.

И GraphX, и GraphFrames предоставляют решение для sssp. И снова, к сожалению, обе реализации возвращают только длину кратчайших путей, а не сами пути (GraphX ​​ и GraphFrames). Но этот ответ предоставляет реализацию алгоритма для GraphX ​​и Scala, который также возвращает пути. Все три решения используют Pregel.

Перевод вышеупомянутого ответа на GraphFrames / Python:

1. Подготовка данных

Укажите уникальные идентификаторы для всех узлов и измените имена столбцов, чтобы они соответствовали описанным именам здесь

import pyspark.sql.functions as F

df = ...

vertices = df.select("currentnode").withColumnRenamed("currentnode", "node").union(df.select("childnode")).distinct().withColumn("id", F.monotonically_increasing_id()).cache()

edges = df.join(vertices, df.currentnode == vertices.node).drop(F.col("node")).withColumnRenamed("id", "src")\
        .join(vertices, df.childnode== vertices.node).drop(F.col("node")).withColumnRenamed("id", "dst").cache() 
Nodes                   Edges
+------+------------+   +-----------+---------+------------+------------+
|  node|          id|   |currentnode|childnode|         src|         dst|
+------+------------+   +-----------+---------+------------+------------+
| leaf2| 17179869184|   |     child1|    leaf4| 25769803776|249108103168|
|child1| 25769803776|   |     child1|   child3| 25769803776| 68719476736|
|child3| 68719476736|   |     child1|    leaf2| 25769803776| 17179869184|
| leaf6|103079215104|   |     child3|    leaf6| 68719476736|103079215104|
|  root|171798691840|   |     child3|    leaf5| 68719476736|214748364800|
| leaf5|214748364800|   |       root|   child1|171798691840| 25769803776|
| leaf4|249108103168|   +-----------+---------+------------+------------+
+------+------------+   

2. Создайте GraphFrame.

from graphframes import GraphFrame
graph = GraphFrame(vertices, edges)

3. Создайте UDF, которые будут составлять отдельные части алгоритма Прегеля.

The message type:
from pyspark.sql.types import *
vertColSchema = StructType()\
      .add("dist", DoubleType())\
      .add("node", StringType())\
      .add("path", ArrayType(StringType(), True))

Вершинная программа:

def vertexProgram(vd, msg):
    if msg == None or vd.__getitem__(0) < msg.__getitem__(0):
        return (vd.__getitem__(0), vd.__getitem__(1), vd.__getitem__(2))
    else:
        return (msg.__getitem__(0), vd.__getitem__(1), msg.__getitem__(2))
vertexProgramUdf = F.udf(vertexProgram, vertColSchema)

Исходящие сообщения:

def sendMsgToDst(src, dst):
    srcDist = src.__getitem__(0)
    dstDist = dst.__getitem__(0)
    if srcDist < (dstDist - 1):
        return (srcDist + 1, src.__getitem__(1), src.__getitem__(2) + [dst.__getitem__(1)])
    else:
        return None
sendMsgToDstUdf = F.udf(sendMsgToDst, vertColSchema)

Агрегация сообщений:

def aggMsgs(agg):
    shortest_dist = sorted(agg, key=lambda tup: tup[1])[0]
    return (shortest_dist.__getitem__(0), shortest_dist.__getitem__(1), shortest_dist.__getitem__(2))
aggMsgsUdf = F.udf(aggMsgs, vertColSchema)

4. Соедините детали.

from graphframes.lib import Pregel
result = graph.pregel.withVertexColumn(colName = "vertCol", \
    initialExpr = F.when(F.col("node")==(F.lit("root")), F.struct(F.lit(0.0), F.col("node"), F.array(F.col("node")))) \
    .otherwise(F.struct(F.lit(float("inf")), F.col("node"), F.array(F.lit("")))).cast(vertColSchema), \
    updateAfterAggMsgsExpr = vertexProgramUdf(F.col("vertCol"), Pregel.msg())) \
    .sendMsgToDst(sendMsgToDstUdf(F.col("src.vertCol"), Pregel.dst("vertCol"))) \
    .aggMsgs(aggMsgsUdf(F.collect_list(Pregel.msg()))) \
    .setMaxIter(10) \
    .setCheckpointInterval(2) \
    .run()
result.select("vertCol.path").show(truncate=False)   

Примечания:

  • maxIter должно быть установлено значение не меньше самого длинного пути. Если значение больше, результат не изменится, но время вычислений станет больше. Если значение слишком мало, более длинные пути будут отсутствовать в результате. Текущая версия GraphFrames (0.8.0) не поддерживает остановку цикла, когда больше не отправляются новые сообщения.
  • checkpointInterval должно быть установлено значение меньше maxIter. Фактическое значение зависит от данных и доступного оборудования. Когда возникает исключение OutOfMemory или сеанс Spark зависает на некоторое время, значение можно уменьшить.

Конечный результат - обычный фрейм данных с содержимым

+-----------------------------+
|path                         |
+-----------------------------+
|[root, child1]               |
|[root, child1, leaf4]        |
|[root, child1, child3]       |
|[root]                       |
|[root, child1, child3, leaf6]|
|[root, child1, child3, leaf5]|
|[root, child1, leaf2]        |
+-----------------------------+

При необходимости здесь можно отфильтровать нелистовые узлы.

person werner    schedule 22.06.2020
comment
очень лаконичная и чистая реализация. Только одно сомнение - как этот интервал контрольных точек работает в этом алгоритме? - person Amardeep Kohli; 08.02.2021
comment
Параметр контрольной точки определяет, как часто будет создаваться контрольная точка. Эффект контрольной точки здесь заключается в том, что она уменьшает требуемый объем памяти, но увеличивает количество операций ввода-вывода. - person werner; 10.02.2021