0

I am uploading file using flask rest-api and flask. As the file size is large I am using celery to upload the file on server. Below is the code.

Flask Rest API

@app.route('/upload',methods=['GET','POST'])            
def upload():                                           
    file = request.files.get("file")
    
    if not file:
       return "some_error_msg"
    elif file.filename == "":
        return "some_error_msg"
        
     
     if file:
        filename = secure_filename(file.filename)
    
     result = task_upload.apply_async(args=(filename, ABC, queue="upload")
    
    return "some_task_id"

Celery task

@celery_app.task(bind=True)
def task_upload(self, filename: str, contents: Any) -> bool:
    status = False
    
    try:
        status = save_file(filename, contents)
    except exception as e:  
        print(f"Exception: {e}")

    return status

Save method

def save_file(filename: str, contents: Any) -> bool:

    file: Path = MEDIA_DIRPATH / filename
    status: bool = False
 
    # method-1 This code is using flask fileStorage, contents= is filestorage object
    if contents:
       contents.save(file)
       status = True
       
    # method-2 This code is using request.stream, contents= is IOBytes object 
       with open(file, "ab") as fp:
          chunk = 4091
          while True:
              some code.
              f.write(chunk) 
             
           status = True
       
     return status

I am getting error while trying both methods

For Method-1, where I tried passing file variable(fileStorage type object) and getting error as

exc_info=(<class 'kombu.exceptions.EncodeError'>, EncodeError(TypeError('Object of type FileStorage is not JSON serializable'))

For Method-2, where I tried passing request.stream and getting error as

<gunicorn.http.body.Body object at some number>
TypeError: Object of type Body is not JSON serializable

How can I pass file(ABC) to celery task?
I am preferring method-1 but any will do. Please suggest.

winter
  • 467
  • 2
  • 10
  • my guess is you have to set the mime type/headers correctly, flask might be defaulting to assume you are trying to send json – Matthias Feb 03 '23 at 09:52
  • Hi Matthias, Thanks. I tried searching for suitable content-type but did not found. But during the search I also come to know that they are not serializable. I also tried pickle as task_serializer but at some point it breaks my app so I again reverted back to json. So, I must try something different way instead of passing entire file. – winter Feb 04 '23 at 09:00
  • Hi Matthias, I am trying to just pass the filename to celery task and inside celery task, Can I directly access the stream address given by gunicorn to copy from there. Is it possible? – winter Feb 04 '23 at 15:48
  • 1
    I'm not sure, just try it. Also might be worth looking online for example "how to upload large file async using flask" – Matthias Feb 06 '23 at 09:19
  • Hi Matthias, Thanks. I tried accessing stream address(gunicorn body) but failed using this approach. Yes, async opened few options like gevent, process, threading, nginx upload module. I will go through each of these and surely one of them will work. – winter Feb 07 '23 at 12:46
  • excellent, when you figure it out you can post your solution here to help others – Matthias Feb 08 '23 at 10:15
  • Hi Matthias, Sure I will post it and update soon. – winter Feb 08 '23 at 12:15

1 Answers1

0

You can use any way from the following like gevent, multiprocessing, multithreading, nginx upload module.

For my use-case thread was better fit. This is pseudo code structure.

 class UploadWorker(Thread):
     """Create a upload worker background thread."""

     def __init__(self, name: str, daemon: bool, filename: str, contents: str, read_length: int) -> None:
         """Initialize the defaults."""
         self.filename: str = filename
         self.contents: str = contents
         self.read_length: int = read_length
         self._kill: Event = Event()
         super().__init__(name=name, daemon=daemon)

     def run(self) -> None:
         """Run the target function."""

         print(f"Thread is_set: {self._kill.is_set()=}")

         while not self._kill.is_set():

             save_file(self.filename, self.contents, self.read_length) 
             # Better copy the function code directly here instead of calling the function 


     def kill(self):
         """Revoke or abort the running thread."""
         self._kill.set()
         

Then create object of background worker

upload_worker = UploadWorker("uploader", True, filename, contents, read_length)

To kill or cancel use

upload_worker.kill()

Reference Link Is there any way to kill a Thread?

winter
  • 467
  • 2
  • 10