I have a list of directed edges which represent a tree. 'u v' means u is a child of v.
sc = SparkContext(conf = conf)
lines = sc.textFile("/content/sample_data/data.txt")
lines.take(10)
['0 0', '1 0', '2 0', '3 1', '4 1', '5 2', '6 2', '7 3', '8 3', '9 4']
I converted the above to the following form stored as intermediate
:
[(0, ('out', 0)),
(0, ('in', 0)),
(1, ('out', 0)),
(0, ('in', 1)),...]
I am trying to build an adjacency list of the form from the above:
[(8721, [('out', 4360), ('in', 17443), ('in', 17444)]),
(9291, [('out', 4645), ('in', 18583), ('in', 18584)]),
(9345, [('out', 4672), ('in', 18691), ('in', 18692)]),..]
Here, First row tells that 8721 is a child of 4360 and [17443, 17444] are children of 8721
I am using groupByKey
or reduceByKey
methods exposed by the Spark module.
intermediate.groupByKey().mapValues(list)
Above line is taking a lot of time. It is taking almost 250 seconds for 100 MB of test data on 8-core machine with 12 GB RAM. I have to eventually deploy it for >15GB data on distributed environment.
I understand that groupByKey causes shuffling of data across all nodes. Is there any way to avoid it in my case ? Any suggestions on how to optimise this operation is appreciated.