3

Taking an example code from https://livebook.manning.com/book/spark-graphx-in-action/chapter-6/1

import org.apache.spark.graphx._

def dijkstra[VD](g:Graph[VD,Double], origin:VertexId) = {
    var g2 = g.mapVertices(
        (vid,vd) => (false, if (vid == origin) 0.0 else Double.MaxValue,
            List[VertexId]()))
    for (i <- 1L to g.vertices.count-1) {
        val currentVertexId =
            g2.vertices.filter(!_._2._1) 
                .fold((0L,(false,Double.MaxValue,List[VertexId]())))((a,b) =>
                    if (a._2._2 < b._2._2) a else b)
                ._1
    val newDistances = g2.aggregateMessages[(Double,List[VertexId])]( 
        ctx => if (ctx.srcId == currentVertexId) 
            ctx.sendToDst((
                ctx.srcAttr._2 + ctx.attr,
                ctx.srcAttr._3 :+ ctx.srcId)),
                (a,b) => if (a._1 < b._1) a else b)
    g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => {
        val newSumVal =
            newSum.getOrElse((Double.MaxValue,List[VertexId]())) 
        (vd._1 || vid == currentVertexId,
            math.min(vd._2, newSumVal._1),
            if (vd._2 < newSumVal._1) vd._3 else newSumVal._2)})
    }
    g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
        (vd, dist.getOrElse((false,Double.MaxValue,List[VertexId]())).productIterator.toList.tail))
}

val myVertices = spark.sparkContext.makeRDD(Array((1L, "A"), (2L, "B"), (3L, "C"),
  (4L, "D"), (5L, "E"), (6L, "F"), (7L, "G")))
val myEdges = spark.sparkContext.makeRDD(Array(Edge(1L, 2L, 7.0), Edge(1L, 4L, 5.0),
  Edge(2L, 3L, 8.0), Edge(2L, 4L, 9.0), Edge(2L, 5L, 7.0),
  Edge(3L, 5L, 5.0), Edge(4L, 5L, 15.0), Edge(4L, 6L, 6.0),
  Edge(5L, 6L, 8.0), Edge(5L, 7L, 9.0), Edge(6L, 7L, 11.0)))
val myGraph = Graph(myVertices, myEdges)

val result = dijkstra(myGraph, 1L)

result.vertices.map(_._2).collect

Every time I run this code, the VertexRDD stays in memory and I cannot release it.

enter image description here

It seems like GraphX is caching the graph data even though it is not specified in the code. Is it possible to release the previous run's RDD data from memory?

I have tried to unpersist by doing result.unpersist(), result.vertices.unpersist(), result.edges.unpersist(), and even result.checkpoint().

Ultimately, I want to run the code in a for loop to find multiple results for different origin, and unless I can figure out how to release the RDDs from before, I come across memory issues.

Update: A brute force method I came up with to clear all VertexRDD and EdgeRDD

for ((k,v) <- spark.sparkContext.getPersistentRDDs) {
  val convertedToString = v.toString()
  if (convertedToString.contains("VertexRDD") || convertedToString.contains("EdgeRDD")) {
      v.unpersist()
  }
}
Regalia9363
  • 342
  • 2
  • 14
  • small amount of data. salesman? – thebluephantom Feb 06 '21 at 14:46
  • @thebluephantom sorry, what do you mean? – Regalia9363 Feb 06 '21 at 15:01
  • travelling salesman algorithm? interesting issue – thebluephantom Feb 06 '21 at 15:03
  • @thebluephantom the code snippet is dijkstra's shortest path algorithm, I suppose similar to salesman algorithm – Regalia9363 Feb 06 '21 at 15:07
  • Did notice that but I thought could be variation on – thebluephantom Feb 06 '21 at 15:17
  • Does this answer your question? [How to uncache RDD?](https://stackoverflow.com/questions/25938567/how-to-uncache-rdd) – mazaneicha Feb 06 '21 at 16:50
  • @mazaneicha I have tried all the unpersist methods that can be applied to the updated graph but it does not clear the memory. I'm now looking into methods of unpersisting by filtering the unecessary cache using `spark.sparkContext.getPersistentRDDs`. Edit: Sorry, I just saw the non-accepted answer in your link and it seems like he is trying to approach what I am trying to do now. – Regalia9363 Feb 06 '21 at 17:10
  • 2
    @mazaneicha. i think the point is there is no cache or persist in the code, unless my eyes are bad – thebluephantom Feb 06 '21 at 17:11
  • @thebluephantom Yes, correct – Regalia9363 Feb 06 '21 at 17:12
  • 1
    @thebluephantom I'd be surprised if GraphX doesn't cache vertex and edge RDDs internally, especially for the iterative algorithms they use. OP might want to go through the source code to confirm it, but that doesn't change the way those RDDs can be `unpersist()`ed (which the post I referenced shows). Good question nevertheless, upvoted :) – mazaneicha Feb 06 '21 at 17:54
  • I've done loops operating on graphx graphs and noticed the same behaviour - the vertex and edge RDDs get cached and stay cached - up until the point that executor memory gets full - then it starts uncaching the previously cached RDDs - seems to be in a FIFO manner - so I guess that unless you are experiencing out of memory errors you could just leave as is like I did – Ranvir Mohanlal Feb 06 '21 at 18:22
  • @mazaneicha. In Spark, RDDs are not persisted in memory by default. To avoid recomputation, they must be explicitly cached when using them multiple times (see the Spark Programming Guide). Graphs in GraphX behave the same way. When using a graph multiple times, make sure to call Graph.cache() on it first. – thebluephantom Feb 06 '21 at 18:24
  • probably no issue here, it is small and fits into memory. why not cache if you can keep it in? spark is in memory processing as much as possible – thebluephantom Feb 06 '21 at 18:37
  • @RanvirMohanlal I'm trying to find the dijkstra shortest path for set of all of the connected vertices for a graph with ~50 vertices, ~300 edges, and I would come across OOM error. Additionally, the code gets slower and slower to run after each iteration, which I suspect is due to lineage issue. I try to truncate the lineage by `result.checkpoint()` after every run, but it is still slowing down. Have you faced this problem? – Regalia9363 Feb 06 '21 at 18:43
  • @thebluephantom Looks like vertices and edges are `cache()`ed during `Graph` construction, https://github.com/apache/spark/blob/branch-2.3/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala#L319. In later releases (2.4.+) StorageLevel is explicitly set to `MEMORY_ONLY`, so that they can be LRU-discarded as others mention. – mazaneicha Feb 07 '21 at 14:53
  • 1
    By the way, isn't it time to migrate off of Spark 2.3, and from RDD- to Dataframe-based graph implementation? https://github.com/graphframes/graphframes/tree/v0.8.1 – mazaneicha Feb 07 '21 at 14:55
  • But the question is more about lineage I think. The memory only is of course for iterative processing. So we need checkpointing. – thebluephantom Feb 07 '21 at 16:46
  • @igeass could you include the looping code here too - maybe there's something in there we could also look at because I'm guessing running the above code once doesn't give you any problems right? – Ranvir Mohanlal Feb 07 '21 at 20:45

0 Answers0