6

I have created a GraphFrame in Spark and the graph currently looks as following:

Basically, there will be lot of such subgraphs where each of these subgraphs will be disconnected to each other. Given a particular node ID I want to find all the other nodes within the subgraph. For instance, if the node ID 1 is given then the graph will traverse and return 2,10,20,3,30.

I have created a motif but it doesn't give the right result.

testgraph.find("(a)-[]->(b); (c)-[]->(b)").filter("(a.id = '1')").show()

Unfortunately the connected component function consider the whole graph. Is it possible to get all the nodes within a disconnected subgraph given a particular node ID using GraphFrame/GraphX?

sjishan
  • 3,392
  • 9
  • 29
  • 53
  • 1
    But what's the issue with working on the whole graph? The way the `connectedComponents` algorithm works, it makes it very easy to find the disconnected subgraphs within the original graph. Basically, everything in the same subgraph gets its `Vertex.attr` set to the smallest `VertexID` in the subgraph. At that point, finding all the connected components to a given node is as easy finding all the vertices in the `connectedComponents` result graph that have the same `vertex.attr` value. – David Griffin May 27 '16 at 20:09
  • For example, in your graph pictured above, after running `connectedComponents`, every vertex in the left-hand subgraph would have its `attr` set to `1L`. Every vertex in the right-hand subgraph would have its `attr` set to `4L`. Then you can just use simple `RDD` operations to `filter` out the nodes that are in the correct subgraph. – David Griffin May 27 '16 at 20:15
  • Hi @DavidGriffin I know that. However, running `connectedComponents` is really slow when you have billion of nodes and you need to find just three or four of such subgraphs. In such cases, computing `connectedComponents` is really costly. – sjishan May 28 '16 at 03:26
  • In generally it won't be cheaper than `connectedComponents`. That being said it works just fine for me. Are you sure you didn't mess the types for in filter expression? – zero323 May 28 '16 at 12:00
  • Hi @zero323 Can you explain why you think it won't be cheaper? My understanding is that GraphFrame is based on DataFrame when I want to traverse based on one particular Node ID, It will first search it from the Nodes DataFrame and then traverse to find the other nodes associated with it. Unlike a normal graph search, where may need to start randomly and to keep traversing until you find the Node ID and then you traverse to get the other nodes associated with it. – sjishan May 28 '16 at 18:12
  • Problem is that search is performed using joins. Assuming that initial node has low cardinality it will be handled by a broadcast joins but in the worst case scenario it'll require full blown joins. – zero323 May 28 '16 at 19:10
  • can you provide the code that you use to generate `testgraph`? – Boris Gorelik Jun 09 '16 at 12:42
  • I have similar issue. I have billions of nodes and need to find connected components. I would love to get input from people who have done something similar. – Shirish Kumar Oct 26 '16 at 01:56
  • @ShirishKumar did you ever find anyone who had done something similar? I ran into a similar problem in my question: https://stackoverflow.com/questions/46396605/efficiently-calculating-connected-components-in-pyspark/46431863#46431863 – oliver Sep 28 '17 at 01:54
  • @oliver I ended up writing my own version of Connected Component using Map-reduce on Spark. GraphX was not scaling well for my use case - as far as I know, GraphX is not getting much attention either. You can check it here - https://github.com/kwartile/connected-component. I have not thought over how I can address the the problem stated above. – Shirish Kumar Sep 28 '17 at 17:55

1 Answers1

1

Getting the connected component related to a specific vertex can be done using a BFS traversal that starts from this vertex and collects all its neighbors on several hops. This can be simply done through the Pregel API offered by GraphX, where we should implement a vertexProgram, sendMessage and mergeMessages functions. The algorithm is triggered on the reception of an initial message. The center sends a message to its neighbors that will propagate it to their neighbors and so on till covering the connected component. Every vertex that receives a msg is checked so that it won't be activated in the following iterations.

Here is the implementation of this approach:

import org.apache.spark.graphx._
import org.apache.spark.{SparkConf, SparkContext}

object ConnectedComponent extends  Serializable {

    def main(args = Array[String]) = {
        
        val conf = new SparkConf().setAppName("ConnectedComponent").setMaster("local")
        val sc = new SparkContext(conf)
        val vRDD = sc.objectFile[(VertexId,Int)]("/path/to/vertex/rdd/file/")
        val eRDD = sc.objectFile[Edge[Int]]("/path/to/edge/rdd/file/")
        val graph = Graph(vRDD, eRDD)
        val centerOfCC = graph.pickRandomVertex()
        var cc = extractCC(graph, center)
        cc.vertices.collect.foreach(println)

        sc.stop()
    }

    def extractCC(g: Graph[Int, Int], center: VertexId): Graph[Int, Int] = {
        /* Return a subgraph of the input graph containing 'center'  with the connected component
         */
        val initialGraph = g.mapVertices((id, attr) => VertexData(attr, false, false, center))
        val connectedComponent = initialGraph.pregel(initialMsg = 0)(vprog, sendMsg, mergeMsgs)
                                .subgraph(vpred = (id, attr) => attr.checked == true)
                                .mapVertices((id, vdata) => vdata.attr)
        connectedComponent
    }


    case class VertexData( var attr : Int, // label of the vertex
                    var checked : Boolean, // check visited vertices 
                    var propagate : Boolean, // allow forwarding msgs or not
                    var center: VertexId) // ID of the connectedComponent center
    def vprog(id:VertexId, vdata: VertexData, msg: Int): VertexData = {

        val attr : Int = vdata.attr 
        var checked : Boolean = vdata.checked
        var propagate : Boolean = vdata.propagate
        val center : VertexId = vdata.center

        if (checked==false && msg == 0 && id==center) {
          propagate = true
          checked = true
        }
        else if(checked==false && msg == 1) {
          propagate = true
          checked = true
        }
        else if(checked == true && msg == 1){
          propagate = false
        }
        new VertexData(attr, checked, propagate, center)
    }

    def sendMsg(triplet: EdgeTriplet[VertexData, Int]):Iterator[(VertexId, Int)] = {
        var it : Iterator[(VertexId, Int)] = Iterator()
        if(triplet.dstAttr.propagate==true)
          it = it ++ Iterator((triplet.srcId, 1))
        if(triplet.srcAttr.propagate==true)
          it = it ++ Iterator((triplet.dstId, 1))
        it
    }

    def mergeMsgs(a: Int, b: Int): Int = math.max(a, b)
}
PhiloJunkie
  • 1,111
  • 4
  • 13
  • 27