I've quite a similar problem described here: How to perform one operation on each executor once in spark I followed the first approach in the first answer but still got a Serialization problem.
What I want to do is, I have queries like a tuple of (sourceVertex, targetVertex) and send these to executors and executors will return to me a shortest path. For this I'm using jgrapht.
When I implement like this
class ShortestPath(graph: SimpleDirectedWeightedGraph[Node, DefaultWeightedEdge],
bc: Broadcast[SimpleDirectedWeightedGraph[Node, DefaultWeightedEdge]]) extends Serializable {
def calculateShortestPath(vertexRDD: RDD[Node]) = {
val result = vertexRDD.map(vertex => {
val dijkstraShortestPath: DijkstraShortestPath[Node, DefaultWeightedEdge]
= new DijkstraShortestPath[Node, DefaultWeightedEdge](bc.value)
val distanceIn = dijkstraShortestPath.getPath(vertex, Node(4, 1, true)).getWeight()
distanceIn
})
result.collect().foreach(println(_))
}
}
object ShortestPath {
def apply(graph: SimpleDirectedWeightedGraph[Node, DefaultWeightedEdge],
bc: Broadcast[SimpleDirectedWeightedGraph[Node, DefaultWeightedEdge]]): ShortestPath = new ShortestPath(graph, bc)
}
Everything is okay
But the problem is I think I'm creating dijkstraShortestPath
object for each task, am I right?
My aim is to create this object for each executor and use it for each task on that executor.
The link that I gave says create a Object with lazy val, instantiate your think here then use it RDD map function. I implement that solution like this:
object Dij {
lazy val dijsktra = {
val graph = GraphCreator.createGraph()
val dijkstraShortestPath: DijkstraShortestPath[Node, DefaultWeightedEdge] = new DijkstraShortestPath[Node, DefaultWeightedEdge](graph)
dijkstraShortestPath
}
}
and used in ShortestPath class
val result = vertexRDD.map(vertex => {
val dijkstraShortestPath = Dij.dijsktra
val distanceIn = dijkstraShortestPath.getPath(vertex, Node(4, 1, true)).getWeight()
dijkstraShortestPath
})
result.collect().foreach(println(_))
but then I'm getting serialization error thank says
- object not serializable (class: org.jgrapht.alg.shortestpath.DijkstraShortestPath, value: org.jgrapht.alg.shortestpath.DijkstraShortestPath@2cb8e13b)
which is right, when i look implementation there is no Serializable.
And another question is if it is not Serializable then how my first implementation worked?