0

I am using Spark/GraphFrames from Python and from R. When I call PageRank on a small graph from Python, it is a lot slower than with R. Why is it so much slower with Python, considering that both Python and R are calling the same libraries?

I'll try to demonstrate the problem below.

Spark/GraphFrames includes examples of graphs, such as friends, as described on this link. This is a very small directed graph with 6 nodes and 8 edges (note that the example is not the same compared to other versions of GraphFrames).

enter image description here

When I run the following piece of code with R, it takes almost not time to calculate PageRank:

library(graphframes)
library(sparklyr)
library(dplyr)

nodes <- read.csv('nodes.csv')
edges <- read.csv('edges.csv')

sc <- spark_connect(master = "local", version = "2.1.1")

nodes_tbl <- copy_to(sc, nodes)
edges_tbl <- copy_to(sc, edges)

graph <- gf_graphframe(nodes_tbl, edges_tbl)
ranks <- gf_pagerank(graph, reset_probability = 0.15, tol = 0.01)
print(ranks$vertices)

results <- as.data.frame(ranks$vertices)
results <- arrange(results, id)
results$pagerank <- results$pagerank / sum(results$pagerank)

print(results)

When I run the equivalent with PySpark, it takes 10 to 30 minutes:

from pyspark.sql import SparkSession
from graphframes.examples import Graphs

if __name__ == '__main__':

    sc = SparkSession.builder.master("local").getOrCreate()
    g = Graphs(sc).friends()
    results = g.pageRank(resetProbability=0.15, tol=0.01)
    results.vertices.select("id", "pagerank").show()
    results.edges.select("src", "dst", "weight").show()

I tried different version of Spark and GraphFrames for Python to be aligned with the settings of R.

zero323
  • 322,348
  • 103
  • 959
  • 935
joel314
  • 1,060
  • 1
  • 8
  • 22
  • 1
    Likely because the code are not fully equivalent. In particular these will result in different number of partitions, and further escalate downstream. See for example [Spark iteration time increasing exponentially when using join](https://stackoverflow.com/q/31659404). If you want to make it somewhat equivalent to `sparklyr` code set `sc.conf.set("spark.sql.shuffle.partitions", 1)` at the beginning of your Python code - it won't scale, but it will run fast on such small graph (as [higher parallelism is not always better](https://stackoverflow.com/q/41090127)) – zero323 Oct 05 '18 at 11:11
  • Thank you, I think you answered my question. I should have added indeed that the Python code was able to run successfully on very large instances (I didn't try large instances with R). I had the impression it had something to do with parallelism but I wasn't aware of the `spark.sql.shuffle.partitions` parameter. Thanks a lot! – joel314 Oct 05 '18 at 11:17
  • 1
    Happy to help. I don't have time right now to trace it and find the exact culprit, but if you want to investigate it further, the problem must be introduced before `PageRank` is actually invoked, likely in `indexedEdges`. `PageRank` is implemented using older API, so it won't be affected by this settings. – zero323 Oct 05 '18 at 11:22

1 Answers1

4

In, general when you see such significant runtime differences between pieces of code that are apparently equivalent in different backends you have to consider two possibilities:

  • There are not really equivalent. Despite using the same Java libraries under the hood, the path which different language use to interact with the JVM are not the same, and when the code reaches the JVM, it might not use the same call chain.
  • The methods are equivalent but the configuration and / or data distribution is not the same.

In this particular case the first and the most obvious reason is how you load the data.

However, as far as I can tell tell, these options shouldn't affect the runtime in this particular case. Moreover the path before code reaches JVM backend in both cases, doesn't seem to differ enough to explain the difference.

This suggests that problem lies somewhere in the configuration. In general there are at least two options which can significantly affect data distribution, and therefore the execution time:

  • spark.default.parallelism - used with RDD API to determine the number of partitions in different cases, including default post-shuffle distribution. For possible implications see for example Spark iteration time increasing exponentially when using join

    It doesn't look like it affects your code here.

  • spark.sql.shuffle.partitions - used with Dataset API to determine the number of partitions after a shuffle (groupBy, join, etc.).

    While PageRank code uses old GraphX API, and this parameter is not directly applicable there, before data is passed to the older API, involves indexing edges and vertices with Dataset API.

    If you check the source you'll see that both indexedEdges and indexVertices use joins, and therefore depend on spark.sql.shuffle.partitions.

    Furthermore the number of partitions set by aforementioned methods will be inherited by the GraphX Graph object, significantly affecting execution time.

    If you set spark.sql.shuffle.partitions to a minimum value:

    spark: SparkSession
    spark.conf.set("spark.sql.shuffle.partitions", 1)
    

    the execution time on such small data should be negligible.

Conclusion:

You environments are likely to use different values of spark.sql.shuffle.partitions.

General Directions:

If you see behavior like this, and want to roughly narrow down the problem you should take a look at the Spark UI, and see where things diverge. In this case you're likely to see significantly different numbers of tasks.

zero323
  • 322,348
  • 103
  • 959
  • 935