-1

I have a RDD which I created using PySpark and sizes around 600 GB after joining by key value which looks exactly like this.

[('43.72_-70.08', (('0744632', -70.08, 43.72, '2.4'), '18090865')),
 ('43.72_-70.08', (('0744632', -70.08, 43.72, '2.4'), '18090865')),
 ('43.25_-67.58', (('0753877', -67.58, 43.25, '7.2'), '18050868')),
 ('43.01_-75.24', (('0750567', -75.24, 43.01, '7.2'), '18042872'))]

I want something like this and sorted by the first element:

['0744632', '18090865', '2.4',
'0744632', '18090865', '2.4',
'0750567', '18042872', '7.2',
'0753877', '18050868', '7.2']

Is there a way I can get data from tuples out and get the output in required format.

Note: This is a 600 GB RDD, with more than a million different values in first column and approx. 15 billion rows, I would really appreciate an optimized way if possible.

Sami
  • 29
  • 1
  • 5

3 Answers3

0

Do this in your spark cluster, e.g.:

In []:
(rdd.map(lambda x: (x[1][0][0], x[1][1], x[1][0][2]))
 .sortBy(lambda x: x[0])
 .flatMap(lambda x: x)
 .collect())

Out[]:
['0744632', '18090865', 43.72, '0744632', '18090865', 43.72, '0750567', 
 '18042872', 43.01, '0753877', '18050868', 43.25]

Alternatively

In []:
import operator as op

(rdd.map(lambda x: (x[1][0][0], x[1][1], x[1][0][2]))
 .sortBy(lambda x: x[0])
 .reduce(op.add))

Out[]:
('0744632', '18090865', 43.72, '0744632', '18090865', 43.72, '0750567', 
 '18042872', 43.01, '0753877', '18050868', 43.25)

This seems like a rather unwieldy structure, if you meant a list of tuples then simply eliminate the flatMap():

In []:
(rdd.map(lambda x: (x[1][0][0], x[1][1], x[1][0][2]))
 .sortBy(lambda x: x[0])
 .collect())

Out[]:
[('0744632', '18090865', 43.72),
 ('0744632', '18090865', 43.72),
 ('0750567', '18042872', 43.01),
 ('0753877', '18050868', 43.25)]
AChampion
  • 29,683
  • 4
  • 59
  • 75
  • I got this error ... Is it because I am running out of memory? **Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 42 tasks (1107.9 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)** – Sami Apr 27 '18 at 04:13
  • Change the size of your max result size `sc.getConf().set("spark.driver.maxResultSize", "2g")` – AChampion Apr 27 '18 at 04:21
  • This is not helping, I still get the same error. Can you please help me with this error. – Sami Apr 27 '18 at 15:11
0

This is a simple one line solution

sorted([(x[1][0][0], x[1][1], x[1][0][3]) for x in your_list]) 

I think it's slightly faster than a lambda solution based on this post What is the difference between these two solutions - lambda or loop - Python

Kenan
  • 13,156
  • 8
  • 43
  • 50
0

Similar to the other Spark answer:

rdd=rdd.map(lambda (a,(b,c)): [b[0], c, b[3]])\
       .sortBy(lambda row: row[0])

You can also use reduce instead of flatMap:

rdd.reduce(lambda x,y: x+y)
ags29
  • 2,621
  • 1
  • 8
  • 14