3

I have written code with Pregel in Spark which processes a graph, but it executes very very slowly for a small dataset. I have written programs with pregel before, but this code really works slow. my cluster consists of 2 workers. each have core i5 CPU and 6 GB of RAM. This is the code I have written in Pregel:

def run[VD, ED: ClassTag](graph: Graph[VD, ED], maxSteps: Int) = {

      val temp_graph = graph.mapVertices { case (vid, _) => mutable.HashMap[VertexId, (Double,VertexId)](vid -> (1,vid)) }

      def sendMessage(e: EdgeTriplet[mutable.HashMap[VertexId, (Double,VertexId)], ED]): Iterator[(VertexId, mutable.HashMap[VertexId, List[(Double, VertexId)]])] = {
        val msg1 = e.dstAttr.map{ case (k,v) => (k, List(v)) }
        val msg2 = e.srcAttr.map{ case (k,v) => (k, List(v)) }
        Iterator((e.srcId, msg1), (e.dstId, msg2))
      }

      def mergeMessage(count1: (mutable.HashMap[VertexId, List[(Double,VertexId)]]), count2: (mutable.HashMap[VertexId, List[(Double,VertexId)]]))= {

        val communityMap = new mutable.HashMap[VertexId, List[(Double, VertexId)]]

        (count1.keySet ++ count2.keySet).map(key => {

          val count1Val: List[(Double, VertexId)] = count1.getOrElse(key,Nil)
          val count2Val: List[(Double, VertexId)] = count2.getOrElse(key,Nil)

          val pp = List(count1Val:::count2Val).flatten
          communityMap += key-> pp
        })
        communityMap
      }

      def vertexProgram(vid: VertexId, attr: mutable.HashMap[VertexId,(Double, VertexId)], message: mutable.HashMap[VertexId, List[(Double, VertexId)]]) = {

        if (message.isEmpty) attr
        else {

          val labels_score: mutable.HashMap[VertexId, Double] = message.map {

            key =>

              var value_sum = 0D
              var maxSimilar_result = 0D

              val max_similar = most_similar.filter(x => x._1 == vid).headOption match {
                case Some(x) => x._2 // most similar neighbor
                // case _ => -1
              }


              if (key._2.exists(x=>x._2==max_similar)) {
                maxSimilar_result = key._2.filter(v => v._2 == max_similar).headOption match {
                  case Some(v) => v._1 // is the most similar node in the List?
                  // case _ => 0D
                }
              }
              else maxSimilar_result = 0D

              key._2.map {
                values =>

                  value_sum += values._1 * (broadcastVariable.value(vid)(values._2)._2)

              }
              value_sum += (beta*value_sum)+((1-beta)*maxSimilar_result)

              (key._1,value_sum) //label list
          }


          val max_value = labels_score.maxBy(x=>x._2)._2.toDouble
          val dividedByMax: mutable.Map[VertexId, (Double, Double)] = labels_score.map(x=>(x._1,(x._2,x._2/max_value))) // divide by maximum value

          val resultMap: mutable.HashMap[VertexId, (Double, Double)] = new mutable.HashMap[VertexId,(Double, Double)]

          dividedByMax.map{ row => // select labels more than threshold P = 0.75
            if (row._2._1 >= p) resultMap += row
          }


          val xx = if (resultMap.isEmpty) dividedByMax.take(1).asInstanceOf[mutable.HashMap[VertexId, (Double, Double)]]
          else resultMap


          val rr = xx.map(x=>(x._1,x._2._1))
          val max_for_normalize= rr.values.sum

          val res: mutable.HashMap[VertexId, (Double, VertexId)] = rr.map(x=>(x._1->(x._2/max_for_normalize,vid))) // Normalize labels

          res
        }
      }


      val initialMessage = mutable.HashMap[VertexId, List[(Double,VertexId)]]()

      val overlapCommunitiesGraph = Pregel(temp_graph, initialMessage, maxIterations = maxSteps)(
        vprog = vertexProgram,
        sendMsg = sendMessage,
        mergeMsg = mergeMessage)

      overlapCommunitiesGraph
    }

Can anyone explain where is the problem for this slow execution?? Is it right that because I have 2 workers and there are a lot of message passing and reducing operations in Pregel, the performance of my code decreases a lot in big datasets??

mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
  • 1
    Two possibilities: 1) Your app uses all the memory in the workers. This would require code improvements or redesign to optimize execution. 2) Your app is limited to certain memory in the workers. This means that there is available physical memory but your app is not allowed to use. To know more about your environment, share with us the command used to submit the job and the memory consumption in the nodes when the job is running. – rsantiago Sep 09 '20 at 00:43
  • 1
    I have recently started using spark and graphx but apparently the pregel API for graphx is very slow. I tried to find why but I haven't found any documentation. I implemented shortest paths of a node to the rest and the delay time is overwhelming and so I checked the class implementation 'org.apache.spark.graphx.lib.ShortestPaths' and it was just as lousy and can fail. – Mandy007 Sep 25 '20 at 18:13
  • 1
    @Mandy007 thank you for your reply, As you said, i think Pregel is very slow but i don't know why they say that Pregel is optimized and fast. Or in order to have a good performance of Pregel maybe we need more machines to handle the amount of messages. I think if we implement our algorithm with simple RDD and without Pregel, it would be much faster –  Sep 27 '20 at 05:00

0 Answers0