0

I have an rdd with this structure:

[ 
    ('Washington', 
      [
        {'age': 15, 'name': 'John', 'extra_info1': 'data'},
        {'age': 25 , 'name': 'David', 'extra_info1': 'data'} 
      ]),
    ('New York',
      [
        {'age' 50, 'name': 'Mike', 'extra_info2': 'blob'},
        {'age' 24, 'name': 'Fred', 'extra_info2': 'blob3'}
      ])
]

As you can see i have key of city and then list of dicts of the people inside. in all keys there are some shared keys in the dictionary like age and name but every dictionary has unique keys too.

Now to output it to csv i iterate on each key of the rdd, convert the list of dictionaries to rdd of spark.sql.Row and create dataframe from the rdd then i use com.databricks.spark.csv to save each dataframe to csv in the hdfs.

I do it like that:

for key in rdd.keys().toLocalIterator():
    city_rdd = rdd.filter(lambda k: k[0] == key)
    city_rdd = city_rdd.map(lambda kv: kv[1]) # return only data without key
    city_rdd_rows = city.rdd.map(lambda r: spark.sql.Row(r))
    city_df = city_rdd.toDF()
    # save the city_df to csv with com.databricks.spark.csv.. i dont have the snippet here

The problem is that it that i have many tuple keys like Washington and New York and each save of file takes 1-2 minutes instead of saving all the csvs parallel and save time.

I have read this but when i couldn't make it work with the csv output and when i tried the json output i saw that the jsons keys was only the shared keys like 'age' and 'name' from my rdd.

What should i do?

Community
  • 1
  • 1
Yaniv Irony
  • 149
  • 4

1 Answers1

0

You could write a Python function that writes csv files to a network location all your workers can reach and then use the .map() function to execute it like this:

import csv

def csv_writer(data):
    city, mydicts = data

    open('//network/location/{}.csv'.format(city), 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        for single_dict in mydicts:
            for key, value in single_dict.items():
                 writer.writerow([key, value])


city_rdd.map(csv_writer).count() #.count() is needed to inniate the action
Thagor
  • 820
  • 2
  • 10
  • 33