3

I have to write data as individual JPG files (~millions) from PySpark to an S3 bucket.

I tried multiple options:

setup: AWS EMR cluster and Jupyter notebook.

  1. Create a boto3 client in 'foreach' method and write to S3 ==> too slow and inefficient as we open the client for every task.

def get_image(y):
    res = requests.get(img_url, stream=True)
    file_name = "./" +str(cid) + ".jpg"
    client = boto3.client('s3')
    file_name = str(cid) + ".jpg"
    client.put_object(Body=res.content, Bucket='test',  Key='out_images/'+file_name)

myRdd.foreach(get_image)
  1. Write to local file system and run an "aws S3 copy" to S3 => Not clear on how to access this data if it is written to each individual worker node's volumes. Logged into worker nodes while the job is running but couldn't exactly find where the JPGs are written to.

def get_image(y):
    res = requests.get(img_url, stream=True)
    file_name = "./" +str(cid) + ".jpg"
    with open(file_name, 'wb') as f:
        f.write(res.content)

myRdd.foreach(get_image)
  1. Write to HDFS and run an s3-dist-cp later. Probably most efficient but haven't had success with code yet. I get path cannot be found exceptions

def get_image(y):
    res = requests.get(img_url, stream=True)
    file_name = "hdfs://" +str(cid) + ".jpg"
    with open(file_name, 'wb') as f:
        f.write(res.content)

myRdd.foreach(get_image)

Can someone suggest a good approach to achieving this?

sudarshan kakumanu
  • 308
  • 1
  • 4
  • 15
  • what if you partition by the number of rows and then you write to S3 this way? I suppose than you could use a script using boto3 to change the format of each file. `rows = df.count()`, `df.repartition(rows).write.avro('save-dir')` – dadadima May 22 '21 at 14:42
  • I have 500 million images to be written. I don't see re-partitioning as an ideal solution here. – sudarshan kakumanu May 22 '21 at 18:34

1 Answers1

1

Solution 1 works great if the foreach is replaced with foreachPartition. After this change, only one client is created per partitition:

def get_image(y_it):
    client = boto3.client('s3')
    for y in y_it:
        img_url = ...
        cid = ...
        res = requests.get(img_url, stream=True)
        file_name = str(cid) + ".jpg"
        client.put_object(Body=res.content, Bucket='test',  Key='out_images/'+file_name)

myRdd.foreachPartition(get_image)

Within the loop over y_it, the same client is reused.

Things can even become faster if requests.Sessions are used for the http call as described in this answer. In this case, a single http session is created outsite of the loop over y_it (like the client) and then reused within the loop.

werner
  • 13,518
  • 6
  • 30
  • 45
  • Thanks for responding. I tried it out for a sample set and it is definitely faster compared to approach 1. I am gonna try again on the full dataset. Just curious if this is the only way and I'd like to hear your thoughts on Approach 3. – sudarshan kakumanu May 24 '21 at 04:44
  • I would stick to approach 1 as this approach can handle everything within Spark and you dont need any external tool. If you want to save data to HDFS within `get_image` you could check [this answer](https://stackoverflow.com/a/46962688/2129801). Its written in Java, but the idea should also work in Python – werner May 24 '21 at 13:53