7

I tried to use Spark to work on simple graph problem. I found an example program in Spark source folder: transitive_closure.py, which computes the transitive closure in a graph with no more than 200 edges and vertices. But in my own laptop, it runs more than 10 minutes and doesn't terminate. The command line I use is: spark-submit transitive_closure.py.

I wonder why spark is so slow even when computing just such small transitive closure result? Is it a common case? Is there any configuration I miss?

The program is shown below, and can be found in spark install folder at their website.

from __future__ import print_function

import sys
from random import Random

from pyspark import SparkContext

numEdges = 200
numVertices = 100
rand = Random(42)


def generateGraph():
    edges = set()
    while len(edges) < numEdges:
        src = rand.randrange(0, numEdges)
        dst = rand.randrange(0, numEdges)
        if src != dst:
            edges.add((src, dst))
    return edges


if __name__ == "__main__":
    """
    Usage: transitive_closure [partitions]
    """
    sc = SparkContext(appName="PythonTransitiveClosure")
    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
    tc = sc.parallelize(generateGraph(), partitions).cache()

    # Linear transitive closure: each round grows paths by one edge,
    # by joining the graph's edges with the already-discovered paths.
    # e.g. join the path (y, z) from the TC with the edge (x, y) from
    # the graph to obtain the path (x, z).

    # Because join() joins on keys, the edges are stored in reversed order.
    edges = tc.map(lambda x_y: (x_y[1], x_y[0]))

    oldCount = 0
    nextCount = tc.count()
    while True:
        oldCount = nextCount
        # Perform the join, obtaining an RDD of (y, (z, x)) pairs,
        # then project the result to obtain the new (x, z) paths.
        new_edges = tc.join(edges).map(lambda __a_b: (__a_b[1][1], __a_b[1][0]))
        tc = tc.union(new_edges).distinct().cache()
        nextCount = tc.count()
        if nextCount == oldCount:
            break

    print("TC has %i edges" % tc.count())

    sc.stop()
Alberto Bonsanto
  • 17,556
  • 10
  • 64
  • 93
c21
  • 231
  • 3
  • 7

2 Answers2

5

There can many reasons why this code doesn't perform particularly well on your machine but most likely this is just another variant of the problem described in Spark iteration time increasing exponentially when using join. The simplest way to check if it is indeed the case is to provide spark.default.parallelism parameter on submit:

bin/spark-submit --conf spark.default.parallelism=2 \
  examples/src/main/python/transitive_closure.py

If not limited otherwise, SparkContext.union, RDD.join and RDD.union set a number of partitions of the child to the total number of partitions in the parents. Usually it is a desired behavior but can become extremely inefficient if applied iteratively.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • 1
    Thank you. Really helpful. I have one more question, if you can help, I will be very grateful. Suppose I have a program which uses many relational operations like join, select, union, update etc. in a loop, until the facts in the relations to a fix point. Even with total tuples no more than 50, I got stuck on the second iteration, and Java heap size exception. I have used cache() and coalesce(1) on every dataframe operation. What could be the problem you think? – c21 Feb 24 '16 at 16:02
0

The useage says the command line is

transitive_closure [partitions]

Setting default parallelism will only help with the joins in each partition, not the inital distribution of work.

Im going to argue that that MORE partitions should be used. Setting the default parallelism may still help, but the code you posted sets the number explicitly (the argument passed or 2, whichever is greater). The absolute minimum should be the cores available to Spark, otherwise you're always working at less than 100%.

AssHat_
  • 353
  • 2
  • 13
  • There is no value in increasing parallelism here. Actually given amount of data you can gain more by reducing it to 1 :) Not to mention dropping Spark whatsoever. – zero323 Feb 23 '16 at 14:22