0

I have two RDD's:

RDD1: data in RDD1 is in unicode format

[[u'a',u'b',u'c'],[u'c',u'f',u'a'],[u'ab',u'cd',u'gh']...]

RDD2:

[(10.1, 10.0), (23.0, 34.0), (45.0, 23.0),....]

Both the RDDs have same number of rows (but one has 2 columns/elements in each row/record and one has 3). Now what I want to do is take all elements from RDD2 and 2nd record from RDD1 and write them out to a csv file on the local file system (not hdfs). So the output in the csv file for above sample will be:

a,b,c,10.0
c,f,a,34.0
ab,cd,gh,23.0

How can I do that in PySpark?

UPDATE: This is my current code:

columns_num = [0,1,2,4,7]
rdd1 = rdd3.map(lambda row: [row[i] for i in columns_num])

rdd2 = rd.map(lambda tup: (tup[0], tup[1]+ (tup[0]/3)) if tup[0] - tup[1] >= tup[0]/3 else (tup[0],tup[1]))

with open("output.csv", "w") as fw:
    writer = csv.writer(fw)
    for (r1, r2) in izip(rdd1.toLocalIterator(), rdd2.toLocalIterator()):
        writer.writerow(r1 + tuple(r2[1:2]))

I am getting error as TypeError: can only concatenate list (not "tuple") to list. If I do writer.writerow(tuple(r1) + r2[1:2]) then I get error as UnicodeEncodeError: 'ascii' codec can't encode character u'\x80' in position 16: ordinal not in range(128) `

Jason Donnald
  • 2,256
  • 9
  • 36
  • 49

1 Answers1

2

If by local you mean driver file system then you can simply collect or convert toLocalIterator and write:

import csv
import sys
if sys.version_info.major == 2:
    from itertools import izip
else:
    izip = zip

rdd1 = sc.parallelize([(10.1, 10.0), (23.0, 34.0), (45.0, 23.0)])
rdd2 = sc.parallelize([("a", "b" ," c"), ("c", "f", "a"), ("ab", "cd", "gh")])

with open("output.csv", "w") as fw:
    writer = csv.writer(fw)
    for (r1, r2) in izip(rdd2.toLocalIterator(), rdd1.toLocalIterator()):
        writer.writerow(r1 + r2[1:2])
zero323
  • 322,348
  • 103
  • 959
  • 935
  • If you use Python 2 and your data contains characters that cannot be encoded using ascii you'll have reencode or create rows manually without help of csv writer. – zero323 Dec 06 '15 at 00:21
  • How can I do that? I think in my data there is a text like `2\x80` and that is why I am getting the error – Jason Donnald Dec 06 '15 at 00:36
  • http://stackoverflow.com/q/18766955/1560062, http://stackoverflow.com/q/3085263/1560062, http://stackoverflow.com/q/5838605/1560062 – zero323 Dec 06 '15 at 09:56
  • thanks! One thing though, is it possible to write to the csv file using `csv writer` with `;` as the `delimiter` and not `,`? – Jason Donnald Dec 07 '15 at 14:10