0

I have a PySpark dataframe that contains records for 6 million people, each with an individual userid. Each userid has 2000 entries. I want to save my each userid's data into a separate csv file with the userid as the name.

I have some code that does this, taken from the solution to this question. However, as I understand it the code will try to partition each of the 6 million ids. I don't actually care about this as I'm going to write each of these files to another non-HDFS server.

I should note that the code works for a small number of userids (up to 3000) but it fails on the full 6 million.

Code:

output_file = '/path/to/some/hdfs/location'
myDF.write.partitionBy('userid').mode('overwrite').format("csv").save(output_file)

When I run the above it takes WEEKS to run with most of that time spent on the writing step. I assume this is because of the number of partitions. Even if I manually specify the number of partitions to something small it still takes ages to execute.

Question: Is there a way to save each of the userids data into a single, well named (name of file = userid) file without partitioning?

Sal
  • 1,653
  • 6
  • 23
  • 36
  • This doesn't sound like a good use-case for hdfs. Can't you change your filesystem? – BlackBear Nov 09 '18 at 15:58
  • I don't know if changing my file system for a single task is the best solution. Are there other file systems that are better at handling 2000x6million rows of data? – Sal Nov 09 '18 at 16:31
  • sorry I really was in a hurry when I wrote that, I realized how silly it was just after I left :D perhaps you can change the way you save. Do you really need separate files? Do you really need _files_? Perhaps some sort of database would be better? – BlackBear Nov 09 '18 at 17:08
  • My plan is to run a separate analysis on each of the 2000 documents (across all 6 million users). Querying a database of that size is a lot of work. I've tried uploading to MySQL but my server can't handle creating the proper indexes which would speed up any queries. If I process the data once and have each of the 2000 documents in a separate file then I don't need any special selects or sorting. – Sal Nov 09 '18 at 20:18

1 Answers1

1

Given the requirements there is really on much hope for improvement. HDFS is not designed for handling very small files, and pretty much any file system will be challenged if you try to open 6 million file descriptors at the same time.

You can improve this a little, if you haven't already by calling repartition before write:

(myDF
    .repartition('userid')
    .write.partitionBy('userid').mode('overwrite').format("csv").save(output_file))

If you can accept multiple ids per file you can use persistent table and bucketing

myDFA
  .write
  .bucketBy(1024, 'userid')  # Adjust numBuckets if needed
  .sortBy('userid')
  .mode('overwrite').format("csv")
  .saveAsTable(output_table))

and process each file separately, taking consecutive chunks of data.

Finally, if plain text output is not a hard requirement you can use any sharded database and partition data by userid.

  • I think this is on the right track. However, I would store the data in parquet format with it sorted as shown here. This way your analysis queries per userid will should be easier. Maybe you could remove the buckets altogether then. Sorting the data will be slow because of the shuffles - but ultimately its this sort that you are missing. – simon_dmorias Dec 11 '18 at 18:17
  • The first suggestion is a slight improvement on my solution but still not the what I need. The second suggestion says "multiple ids per file" which is the exact opposite of what I am asking. (Not saying that to be a jerk, I appreciate the help.) The third suggestion I addressed in the comments on the original question before this answer was posted: yes I can upload to a database (MySQL for example) but putting indexes on 2000 x 6million records is more than my system can handle. – Sal Dec 14 '18 at 01:14