I'm trying to implement a naive version of PageRank in Apache-Spark (1.4.0) with Python. The details of the algorithm (the way it should work) can be found here (look about a third of the way down at the matrix H with stationary vector I).
PageRank is iterative (at each step each vertex emits a share of it's current page rank to it's neighbors, then the reduce function collects page rank sent to each vertex). This results in a loop where RDD's are recycled and updated (RDD's are read-only, so it actually creates a new RDD). In principle, one should be able to use RDD.cache()
.
The problem that I have is that it appears that in my loop, even with the .cache()
the RDD is re-computed at each iteration in the loop. I know the RDD fits in memory (because the input file I'm using is very small, the RDD only has 8 elements). My code is below:
from pyspark import SparkContext, SparkConf
import sys
def my_map(line):
#The structure of line is (vertex, ([list of outgoing edges], current_PageRank))
out_edges = line[1][0]
current_PageRank = line[1][1]
e = len(out_edges)
if e > 0:
return_list = []
for f in out_edges:
return_list.append( (f, current_PageRank/float(e)) )
return return_list
conf = SparkConf().setAppName("PageRank")
sc = SparkContext(conf=conf)
fileName = sys.argv[1]
# lines is an RDD (list) where each element of the RDD is a string (one line of the text file).
lines = sc.textFile(fileName)
# edge_list is an RDD where each element is the list of integers from a line of the text file.
# edge_list is cached because we will refer to it numerous times throughout the computation.
# Each element of edge_list is of the form (vertex, [out neighbors]), so (int, list).
edge_list = lines.map(lambda line: (int(line.split()[0]), [int(x) for x in line.split()[1:]]) ).cache()
# vertex_set is an RDD that is the list of all vertices.
vertex_set = edge_list.map(lambda row: row[0])
# N is the number of vertices in the graph.
N = vertex_set.count()
# Initialize the PageRank vector.
# Each vertex will be keyed with its initial value (1/N where N is the number of vertices)
# Elements of Last_PageRank have the form (vertex, PageRank), so (int, float).
Last_PageRank = vertex_set.map(lambda x: (x, 1.0/N) ).cache()
for number in xrange(40):
Last_PageRank = edge_list.join(Last_PageRank).flatMap(my_map).reduceByKey(lambda a, b: a+b).cache()
### In version 2, I comment the previous and last line out, and un-comment the following 3 lines.
#LList = edge_list.join(Last_PageRank).flatMap(my_map).reduceByKey(lambda a, b: a+b).collect()
#print LList
#Last_PageRank = sc.parallelize(LList)
print Last_PageRank.collect()
To give an idea of why I believe the caching is not working, I timed the code above with the number of iterations of the loop being 5, 10, 15, ..., 40. Then, I changed the code so that at each step I collected the RDD with RDD.collect()
, print it to screen, then use sc.parallelize()
to re-distribute the list as an RDD. When I did this, the computation was significantly faster. The timing data (without the .collect()
) is as follows:
Num Iterations Time(s)
5 14.356s
10 27.783s
15 47.983s
20 75.019s
25 108.298s
30 148.345s
35 195.525s
40 248.699s
By contrast, when I use the .collect()
work-around, the 40 iteration version takes only 43.922s. I would expect then, if the caching were working as I thought it should, that the original version should take (at most) the 43.9s.
Any help is appreciated. By the way, I appreciate any explanations in lay terms (as I'm a self-taught programmer--mathematician by education).