1

I have a PySpark application that (obviously) loads and transforms data.

I want to persist the result RDD to S3, but the Spark-supplied .saveAsTextFile() function does not satisfy my requirements, because it writes multiple RDD entries into one file.

For example, let's assume that the RDD resultRDD is:

[('cats', cats_data), ('dogs', dogs_data), ('flamingos', flamingos_data)]

When calling resultRDD.saveAsTextFile(s3n://path/to/somewhere/), it'll create multiple files, which may look like this:

1. 000_part00 [containing cats_data & dogs_data]
2. 000_part01 [containing only flamingos_data]

Note that the number of files created is not related to the number of elements in the RDD. Furthermore, I can't even know what's inside each file before opening it.

Instead, what I want to create is the following output:

1. cats [containing only cats_data]
2. dogs [containing only dogs_data]
2. flamingos [containing only flamingos_data]

I thought I might use boto's S3Connection and write to S3 manually, like this:

s3_connection = <connecting to S3 here>
bucket = s3_connection.get_bucket('animals_data')

def persist_to_s3(data_tuple):
   s3_key = bucket.create_key(key=data_tuple[0], bucket=bucket)
   s3_key.set_contents_from_string(data_tuple[1])

resultRDD.map(persist_to_s3)

Unfortunately, the connection and bucket objects are neither serializable nor thread safe (I presume), so I can't share the connection between nodes, like above.

I thought I might connect to S3 & get the bucket inside the persist_to_s3 function itself, but this operation will surely make AWS throttle my API usage, since I have a massive RDD.

Clarification: My actual dataset is huge, and the keys are all unique. So re-partitioning by key won't help in this case.

The other option that crossed my mind is to use repartition() / coalesce() in order to reduce the number of partitions and then preform the above operation with mapPartitions(), which will work but will be much slower.

  • What else can I do in order to quickly persist the data to S3 in the format I described?
  • If using repartition() / coalesce() is the only proper way, which is more fit for this usage?
matan129
  • 1,508
  • 2
  • 20
  • 33
  • Sounds interesting, can you elaborate..? How is this different from `coalesce()`ing? – matan129 Dec 30 '15 at 21:28
  • Partitioner requires `getPartition` method which maps from `key` to partition id. See for example Daniel's answer here http://stackoverflow.com/a/23228151/1560062 How you define this mapping is up to you. So you can put all dogs on a one partition and cats on the next one and so on. – zero323 Dec 30 '15 at 21:31
  • Thanks for the help. Yes, I'd be nice if you reopened my question (I'll also go ahead and clarify the key uniqueness in the Q. body). Although the link you provided gives me some idea on how to solve it I'd be awesome if I could get PySpark solutions (and not Scala), and an answer about the `repartition()` vs. `coalesce()`. – matan129 Dec 30 '15 at 21:34
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/99349/discussion-between-zero323-and-matan129). – zero323 Dec 30 '15 at 21:36
  • I've not got a complete solution in Pyspark, but you could definitely do something with `rdd.foreachPartition`, `boto` and `threading` like this http://ls.pwd.io/2013/06/parallel-s3-uploads-using-boto-and-threads-in-python/ – Ewan Leith Dec 31 '15 at 09:54

1 Answers1

1

The other option that crossed my mind is to use repartition() / coalesce() in order to reduce the number of partitions and then preform the above operation with mapPartitions(), which will work but will be much slower.

repartition&mapPartitions is the relatively fast option, but you mentioned that it is slow. I think you can consider more traditional solution like multiple-thread reader/writer

1, write the result data using the format you described;
2, using multiple-thread reader/writer model to parallelly write data to S3 storage. The workflow like "parallel reader --read to--> concurrent blocking queue ----> parallel writer ---write to ----->S3".

Shawn Guo
  • 3,169
  • 3
  • 21
  • 28
  • When saying the `repartition()` is slow I meant that I'll take longer time because it will just have less parallelism when writing, assuming that if I used the normal `saveAsTextFile()` method, Spark'll do optimizations. Using a separate reader/writer is not a good solution, because I'll have the same overhead as in Spark repartitioning... – matan129 Dec 31 '15 at 05:01
  • repartition(numPartitions) does not means less parallelism, it could be adjusted to even larger partition than in saveAsTextFile() and get more parallelism – Shawn Guo Dec 31 '15 at 05:46