I found my old code and created an adapted example for your problem. I used the spark graph library Graphframes for this. The path can be determined by a Pregel like message aggregation loop.
Here the code.
First import all modules
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pyspark.sql.functions as f
from graphframes import GraphFrame
from pyspark.sql.types import *
from graphframes.lib import *
# shortcut for the aggregate message object from the graphframes.lib
AM=AggregateMessages
# to plot the graph
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
spark = (SparkSession
.builder
.appName("PathReduction")
.getOrCreate()
)
sc=spark.sparkContext
Then create a sample dataset
# create dataframe
raw_data = [
("0","1"),
("1","2"),
("1","5"),
("2","3"),
("2","4"),
("a","b"),
("b","c"),
("c","d")]
schema = ["src","dst"]
data = spark.createDataFrame(data=raw_data, schema = schema)
data.show()
+---+---+
|src|dst|
+---+---+
| 0| 1|
| 1| 2|
| 1| 5|
| 2| 3|
| 2| 4|
| a| b|
| b| c|
| c| d|
+---+---+
For visualisation run
plotData_1 = data.select("src","dst").rdd.collect()
plotData_2 = np.array(plotData_1)
plotData_3=[]
for row in plotData_2:
plotData_3.append((row[0],row[1]))
G=nx.DiGraph(directed=True)
G.add_edges_from(plotData_3)
options = {
'node_color': 'orange',
'node_size': 500,
'width': 2,
'arrowstyle': '-|>',
'arrowsize': 20,
}
nx.draw(G, arrows=True, **options,with_labels=True)

With this message aggregation algorithm you find the paths as you searched them. if you set the flag show_steps
to True
the results of each step is shown which helps to understand.
# if flag is true print results within the loop for debuging
show_steps=False
# max itertions of the loop, should be larger then the longest expected path
max_iter=10
# create vertices from edge data set
vertices=(data.select("src").union(data.select("dst")).distinct().withColumnRenamed('src', 'id'))
edges=data
# create graph to get in and out degrees
gx = GraphFrame(vertices, edges)
# calclulate in and out degrees of each node
inDegrees=gx.inDegrees
outDegrees=gx.outDegrees
if(show_steps==True):
print("in and out degrees")
inDegrees.show()
outDegrees.show()
# create intial vertices
init_vertices=(vertices
# join out degrees on vertices
.join(outDegrees,on="id",how="left")
# join in degree on vertices
.join(inDegrees,on="id",how="left")
# define root, childs in the middle and leafs of the path in order to distinguish full paths later on
.withColumn("nodeType",f.when(f.col("inDegree").isNull(),"root").otherwise(f.when(f.col("outDegree").isNull(),"leaf").otherwise("child")))
# define message with all information [array(id) and array(nodeType)] to be send to the next noe
.withColumn("message",f.array_union(f.array(f.array(f.col("id"))),f.array(f.array(f.col("nodeType")))))
# remove columns that are not used anymore
.drop("inDegree","outDegree")
)
if(show_steps==True):
print("init vertices")
init_vertices.show()
# update graph object with init vertices
gx = GraphFrame(init_vertices, edges)
# define empty dataframe to append found paths on
results = sqlContext.createDataFrame(
sc.emptyRDD(),
StructType([StructField("paths",ArrayType(StringType()),True)])
)
# start loopp for mesage aggregation. Set a max_iter value which has to be larger as the longest path expected
for iter_ in range(max_iter):
if(show_steps==True):
print("iteration step=" + str(iter_))
print("##################################################")
# define the message that should be send. Here we send a message to the source node and we take the column message from the destination source we send backward
msgToSrc = AM.dst["message"]
agg = gx.aggregateMessages(
f.collect_set(AM.msg).alias("aggMess"), # aggregation function is a collect into an array (attention!! this can be an expensive operation in terms of shuffel)
sendToSrc=msgToSrc,
sendToDst=None
)
if(show_steps==True):
print("aggregated message")
agg.show(truncate=False)
# stop loop if no more agg messages collected
if(len(agg.take(1))==0):
print("All paths found in " + str(iter_) + " iterations")
break
# get new vertices to send into next round. Here we have to prepare the next message columns all _column_names are temporary columns for calculation purpose only
vertices_update=(agg
# join initial data to aggregation in order to have to nodeType of the vertice
.join(init_vertices,on="id",how="left")
# exploe the nested array with the path and the nodeType
.withColumn("_explode_to_flatten_array",f.explode(f.col("aggMess")))
# put the path aray into a seperate column
.withColumn("_dataMsg",f.col("_explode_to_flatten_array")[0])
# put the node type into a seperate column
.withColumn("_typeMsg",f.col("_explode_to_flatten_array")[1][0])
# deside if a path is complete. A path is complete if the vertices type is a root and the message type is a leaf
.withColumn("pathComplete",f.when(((f.col("nodeType")=="root") & (f.col("_typeMsg")=="leaf")),True).otherwise(False))
# append the curent vertice id to the path array that is send forward
.withColumn("_message",f.array_union(f.array(f.col("id")),f.col("_dataMsg")))
# merge together the path array and the nodeType array for the new message object
.withColumn("message",f.array_union(f.array(f.col("_message")),f.array(f.array(f.col("_typeMsg")))))
)
if(show_steps==True):
print("new vertices with all temp columns")
vertices_update.show()
# add complete paths to the result dataframe
results=(
results
.union(
vertices_update
.where(f.col("pathComplete")==True)
.select(f.col("_message"))
)
)
# chache the vertices for next iteration and only push forward the two relevant columns in order to reduce data shuffeling between spark executors
cachedNewVertices = AM.getCachedDataFrame(vertices_update.select("id","message"))
# create new updated graph object for next iteration
gx = GraphFrame(cachedNewVertices, gx.edges)
print("##################################################")
print("Collecting result set")
results.show()
it shows then the correct results
All paths found in 3 iterations
##################################################
Collecting result set
+------------+
| paths|
+------------+
| [0, 1, 5]|
|[0, 1, 2, 3]|
|[0, 1, 2, 4]|
|[a, b, c, d]|
+------------+
to get your final dataframe you can join it back or take the first and last element of the array into separate columns
result2=(results
.withColumn("dist",f.element_at(f.col("paths"), 1))
.withColumn("node",f.element_at(f.col("paths"), -1))
)
result2.show()
+------------+----+----+
| paths|dist|node|
+------------+----+----+
| [0, 1, 5]| 0| 5|
|[0, 1, 2, 3]| 0| 3|
|[0, 1, 2, 4]| 0| 4|
|[a, b, c, d]| a| d|
+------------+----+----+
You can write the same algorithm with the Graphframes Pregel API I suppose.
P.S: The algorithm in this form might cause problems if the graph has lops or backward directed edges. I had another algorithm to first clean up loops and cycles