3

I'm trying to implement a naive version of PageRank in Apache-Spark (1.4.0) with Python. The details of the algorithm (the way it should work) can be found here (look about a third of the way down at the matrix H with stationary vector I).

PageRank is iterative (at each step each vertex emits a share of it's current page rank to it's neighbors, then the reduce function collects page rank sent to each vertex). This results in a loop where RDD's are recycled and updated (RDD's are read-only, so it actually creates a new RDD). In principle, one should be able to use RDD.cache().

The problem that I have is that it appears that in my loop, even with the .cache() the RDD is re-computed at each iteration in the loop. I know the RDD fits in memory (because the input file I'm using is very small, the RDD only has 8 elements). My code is below:

from pyspark import SparkContext, SparkConf
import sys

def my_map(line):
    #The structure of line is (vertex, ([list of outgoing edges], current_PageRank))
    out_edges = line[1][0]
    current_PageRank = line[1][1]
    e = len(out_edges)
    if e > 0:
            return_list = []
            for f in out_edges:
                    return_list.append( (f, current_PageRank/float(e)) )
            return return_list



conf = SparkConf().setAppName("PageRank")
sc = SparkContext(conf=conf)
fileName = sys.argv[1]

# lines is an RDD (list) where each element of the RDD is a string (one   line of the text file).
lines = sc.textFile(fileName)

# edge_list is an RDD where each element is the list of integers from a line of the text file.
# edge_list is cached because we will refer to it numerous times throughout the computation.
# Each element of edge_list is of the form (vertex, [out neighbors]), so (int, list).
edge_list = lines.map(lambda line: (int(line.split()[0]), [int(x) for x in line.split()[1:]]) ).cache()

# vertex_set is an RDD that is the list of all vertices.
vertex_set = edge_list.map(lambda row: row[0])

# N is the number of vertices in the graph.
N = vertex_set.count()

# Initialize the PageRank vector.
# Each vertex will be keyed with its initial value (1/N where N is the number of vertices)
# Elements of Last_PageRank have the form (vertex, PageRank), so (int, float).
Last_PageRank = vertex_set.map(lambda x: (x, 1.0/N) ).cache()


for number in xrange(40):
    Last_PageRank = edge_list.join(Last_PageRank).flatMap(my_map).reduceByKey(lambda a, b: a+b).cache()

    ### In version 2, I comment the previous and last line out, and un-comment the following 3 lines.
    #LList = edge_list.join(Last_PageRank).flatMap(my_map).reduceByKey(lambda a, b: a+b).collect()
    #print LList
    #Last_PageRank = sc.parallelize(LList)

print Last_PageRank.collect()

To give an idea of why I believe the caching is not working, I timed the code above with the number of iterations of the loop being 5, 10, 15, ..., 40. Then, I changed the code so that at each step I collected the RDD with RDD.collect(), print it to screen, then use sc.parallelize() to re-distribute the list as an RDD. When I did this, the computation was significantly faster. The timing data (without the .collect()) is as follows:

Num Iterations    Time(s)
 5                 14.356s
10                 27.783s
15                 47.983s
20                 75.019s
25                108.298s
30                148.345s
35                195.525s
40                248.699s

By contrast, when I use the .collect() work-around, the 40 iteration version takes only 43.922s. I would expect then, if the caching were working as I thought it should, that the original version should take (at most) the 43.9s.

Any help is appreciated. By the way, I appreciate any explanations in lay terms (as I'm a self-taught programmer--mathematician by education).

TravisJ
  • 1,592
  • 1
  • 21
  • 37
  • Thanks @zero323, this is the same issue. For brevity, the solution is to specify the number of partitions when calling join. The answer given in that post explains in detail why. – TravisJ Sep 24 '15 at 15:04

0 Answers0