I am writing a graph algorithm to find path from certain source vertices to target vertices based on a path definition in a graph using Spark's GraphX library's Pregel API.
I am using scala version 2.12 with spark standalone cluster version 3.3.2.
The way Pregel API works is that it have a first superstep which process the initial message on each vertex of the graph, and then another superstep which operates repeatedly on the graph on each vertex which received a message in the previous step.
When I debug the program, I notice that the VertexProperty for the vertices in the graph updates after the first superstep. Then, it is able to compute the next set of message and send them to the destination vertices, where they also gets applied. However, the change that was applied to the vertices in the last VertexProgram based on message received, is lost on the next iteration of SendMessage, so no more messages are send and algorithm terminate pre-maturily.
Following is the code
class VertexProperty(var typeName: String, var name: String) extends Serializable {
var srcVertexIdsForShortcut: Set[VertexId] = Set.empty[VertexId]
}
class EdgeProperty(var label: String, var srcType: String, var destType: String) extends Serializable {
}
class Message(var shortcutLabel: String) extends Serializable {
var srcVertexIds: Set[VertexId] = Set.empty[VertexId]
def this(shortcutLabel: String, srcVertexIds: Set[VertexId]) = {
this(shortcutLabel)
this.srcVertexIds ++= srcVertexIds
}
}
object VertexProgram extends Serializable {
def apply(vertexId: VertexId, vertexProperty: VertexProperty, message: Message): VertexProperty = {
// on initial message, for source vertex type, add the vertex id into the srcVerticesForShortcut
if (message.srcVertexIds.isEmpty && vertexProperty.typeName.equals("Workbook")) {
vertexProperty.srcVertexIdsForShortcut += vertexId
}
// on receiving a message with vertex ids, add them to the srcVerticesForShortcut
vertexProperty.srcVertexIdsForShortcut ++= message.srcVertexIds
vertexProperty
}
}
class SendMessage(var shortcutModel: ShortcutModel) extends Serializable {
def apply(edgeTriplet: EdgeTriplet[VertexProperty, EdgeProperty]): Iterator[(VertexId, Message)] = {
val matchingEdgePropertyInModel = shortcutModel.isInModel(edgeTriplet)
if (matchingEdgePropertyInModel.isEmpty) {
Iterator.empty
} else {
val isReversed = matchingEdgePropertyInModel.get.label.startsWith("-")
val srcAttr = if (isReversed) edgeTriplet.dstAttr else edgeTriplet.srcAttr
val dstAttr = if (isReversed) edgeTriplet.srcAttr else edgeTriplet.dstAttr
val diff = srcAttr.srcVertexIdsForShortcut.diff(dstAttr.srcVertexIdsForShortcut)
if (diff.nonEmpty) {
Iterator.single(if (isReversed) edgeTriplet.srcId else edgeTriplet.dstId, new Message(shortcutModel.shortcutLabel, diff))
} else {
Iterator.empty
}
}
}
}
object MergeMessage extends Serializable {
def apply(message1: Message, message2: Message): Message = {
val merged = new Message(message1.shortcutLabel)
merged.srcVertexIds ++= message1.srcVertexIds
merged.srcVertexIds ++= message2.srcVertexIds
merged
}
}
/**
create a graph with mock data = Graph[VertexProperty, EdgeProperty]
*/
val graphWithShortcuts =
Pregel(graph, new Message("upstreamDatabases"), 10)(
VertexProgram.apply,
new SendMessage(shortcutModelBroadcast.value).apply,
MergeMessage.apply)
I expect the algorithm to continue to iterate along a path in the graph, however, due to lost update after first iteration, the algorithm stops. I tried the implementation both in Java and scala, believing there might be some issue with Java implementation of Pregel in Spark GraphX, but I see the same results. Hence, something fundamentatly is missing in my implementation.
Could it have something to with VertexProperty class itself and how it is getting updated in VertexProgram? Examples of Pregel implementation I have found online, all work on simple graphs like Graph[Int, Int] or some other primitive type. In my case, the VertexProperty class is more complex and I am wondering if I am not updating it in the right way.