2

I have a graph frame with vertices and edges as below. I am running this on pyspark in jupyter notebook.

vertices = sqlContext.createDataFrame([
      ("12345", "Alice", "Employee"),
      ("15789", "Bob", "Employee"),
      ("13467", "Charlie", "Manager"),
      ("14890", "David", "Director"),
      ("17737", "Fanny", "CEO")], ["id", "name", "title"])

    edges = sqlContext.createDataFrame([
      ("12345", "13467", "works"),
      ("15789", "13467", "works"),
      ("13467", "14890", "works"),
      ("14890", "17737", "works"),
    ], ["src", "dst", "relationship"])

I need to find the hierarchical paths of each emp_id up to the highest level(which is the CEO in this case). I am trying the bfs approach and so far I am successful in getting the path for only one emp_id. Below is my code.

g = GraphFrame(vertices,edges)
result = g.bfs(fromExpr = "id == '12345'", toExpr = "title == 'CEO'", edgeFilter = "relationship == 'works'", maxPathLength = 5)
result.show(5,False)

Output:

+----------------------+-------------------+-----------------------+-------------------+----------------------+-------------------+-----------------+
|from                  |e0                 |v1                     |e1                 |v2                    |e2                 |to               |
+----------------------+-------------------+-----------------------+-------------------+----------------------+-------------------+-----------------+
|[12345,Alice,Employee]|[12345,13467,works]|[13467,Charlie,Manager]|[13467,14890,works]|[14890,David,Director]|[14890,17737,works]|[17737,Fanny,CEO]|
+----------------------+-------------------+-----------------------+-------------------+----------------------+-------------------+-----------------+

I can store this information in a variable and extract using the collect() method.I want to loop through all the id's from the vertices which have a path to the CEO and write it to a dataframe. If anyone is familiar with graphframes can you please help me with this? I have tried looking into other solutions but none are working in my case.

Expected Output:

+-------+--------------------------+
|user_id|path                      |
+-------+--------------------------+
|12345  |12345->13467->14890->17737|
|15789  |15789->13467->14890->17737|
|13467  |13467->14890->17737       |
|14890  |14890->17737              |
|17737  |17737                     |
+-------+--------------------------+
mck
  • 40,932
  • 13
  • 35
  • 50
Joe14_1990
  • 89
  • 1
  • 9

1 Answers1

1

Adapting this answer for your question, and tidied up the result of that answer to get your desired output. Note that you need to interchange 'src' and 'dst' in the edges dataframe for that answer to work, but I think with some effort in modifying that answer, the edges dataframe can be used in its original form...

from graphframes import GraphFrame
from graphframes.lib import Pregel
import pyspark.sql.functions as F
from pyspark.sql.types import *

vertices = spark.createDataFrame([
      ("12345", "Alice", "Employee"),
      ("15789", "Bob", "Employee"),
      ("13467", "Charlie", "Manager"),
      ("14890", "David", "Director"),
      ("17737", "Fanny", "CEO")], ["id", "name", "title"])

edges = spark.createDataFrame([
      ("12345", "13467", "works"),
      ("15789", "13467", "works"),
      ("13467", "14890", "works"),
      ("14890", "17737", "works"),
    ], ["dst", "src", "relationship"])

g = GraphFrame(vertices,edges)

vertColSchema = StructType()\
      .add("dist", DoubleType())\
      .add("node", StringType())\
      .add("path", ArrayType(StringType(), True))

def vertexProgram(vd, msg):
    if msg == None or vd.__getitem__(0) < msg.__getitem__(0):
        return (vd.__getitem__(0), vd.__getitem__(1), vd.__getitem__(2))
    else:
        return (msg.__getitem__(0), vd.__getitem__(1), msg.__getitem__(2))

vertexProgramUdf = F.udf(vertexProgram, vertColSchema)

def sendMsgToDst(src, dst):
    srcDist = src.__getitem__(0)
    dstDist = dst.__getitem__(0)
    if srcDist < (dstDist - 1):
        return (srcDist + 1, src.__getitem__(1), src.__getitem__(2) + [dst.__getitem__(1)])
    else:
        return None

sendMsgToDstUdf = F.udf(sendMsgToDst, vertColSchema)

def aggMsgs(agg):
    shortest_dist = sorted(agg, key=lambda tup: tup[1])[0]
    return (shortest_dist.__getitem__(0), shortest_dist.__getitem__(1), shortest_dist.__getitem__(2))

aggMsgsUdf = F.udf(aggMsgs, vertColSchema)

result = (
    g.pregel.withVertexColumn(
        colName = "vertCol",

        initialExpr = F.when(
            F.col("id") == 17737,
            F.struct(F.lit(0.0), F.col("id"), F.array(F.col("id")))
        ).otherwise(
            F.struct(F.lit(float("inf")), F.col("id"), F.array(F.lit("")))
        ).cast(vertColSchema),

        updateAfterAggMsgsExpr = vertexProgramUdf(F.col("vertCol"), Pregel.msg())
    )
    .sendMsgToDst(sendMsgToDstUdf(F.col("src.vertCol"), Pregel.dst("vertCol")))
    .aggMsgs(aggMsgsUdf(F.collect_list(Pregel.msg())))
    .setMaxIter(5)    ## This should be greater than the max depth of the graph
    .setCheckpointInterval(1)
    .run()
)

df = result.select("vertCol.node", "vertCol.path").repartition(1)
df.show()
+-----+----------------------------+
|node |path                        |
+-----+----------------------------+
|12345|[17737, 14890, 13467, 12345]|
|15789|[17737, 14890, 13467, 15789]|
|13467|[17737, 14890, 13467]       |
|14890|[17737, 14890]              |
|17737|[17737]                     |
+-----+----------------------------+

final = df.select('node', F.concat_ws('->', F.reverse('path')).alias('path'))
final.show()
+-----+--------------------------+
|node |path                      |
+-----+--------------------------+
|12345|12345->13467->14890->17737|
|15789|15789->13467->14890->17737|
|13467|13467->14890->17737       |
|14890|14890->17737              |
|17737|17737                     |
+-----+--------------------------+
mck
  • 40,932
  • 13
  • 35
  • 50