9

I'm using the cloudfile module to upload files to rackspace cloud files, using something like this pseudocode:

import cloudfiles

username = '---'
api_key = '---'

conn = cloudfiles.get_connection(username, api_key)
testcontainer = conn.create_container('test')

for f in get_filenames():
    obj = testcontainer.create_object(f)
    obj.load_from_filename(f)

My problem is that I have a lot of small files to upload, and it takes too long this way.

Buried in the documentation, I see that there is a class ConnectionPool, which supposedly can be used to upload files in parallell.

Could someone please show how I can make this piece of code upload more than one file at a time?

Hobhouse
  • 15,463
  • 12
  • 35
  • 43

1 Answers1

8

The ConnectionPool class is meant for a multithreading application that ocasionally has to send something to rackspace.

That way you can reuse your connection but you don't have to keep 100 connections open if you have 100 threads.

You are simply looking for a multithreading/multiprocessing uploader. Here's an example using the multiprocessing library:

import cloudfiles
import multiprocessing

USERNAME = '---'
API_KEY = '---'


def get_container():
    conn = cloudfiles.get_connection(USERNAME, API_KEY)
    testcontainer = conn.create_container('test')
    return testcontainer

def uploader(filenames):
    '''Worker process to upload the given files'''
    container = get_container()

    # Keep going till you reach STOP
    for filename in iter(filenames.get, 'STOP'):
        # Create the object and upload
        obj = container.create_object(filename)
        obj.load_from_filename(filename)

def main():
    NUMBER_OF_PROCESSES = 16

    # Add your filenames to this queue
    filenames = multiprocessing.Queue()

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        multiprocessing.Process(target=uploader, args=(filenames,)).start()

    # You can keep adding tasks until you add STOP
    filenames.put('some filename')

    # Stop all child processes
    for i in range(NUMBER_OF_PROCESSES):
        filenames.put('STOP')

if __name__ == '__main__':
    multiprocessing.freeze_support()
    main()
Wolph
  • 78,177
  • 11
  • 137
  • 148
  • You don't need `multiprocessing` for IO bound tasks if `cloudfiles` API is thread-safe. `pool = multiprocessing.Pool(); pool.map(upload_file, get_filenames())` seems like a simpler alternative if you decided to use `multiprocessing`. – jfs Mar 12 '11 at 15:30
  • @WoLpH: Thank you very much for your answer! When I try your code I run into a `TypeError: 'Queue' object is not iterable`, is this a mistake I have made? – Hobhouse Mar 12 '11 at 18:41
  • @J.F. Sebastian: As I understand it the ConnectionPool class is supposed to be thread-safe. I just can't wrap my head around how to incorporate your code suggestions into the code. – Hobhouse Mar 12 '11 at 18:44
  • 1
    @Hobhouse: that could be a problen on my end. Since I don't have a Rackspace account readily available I was only able to do limited testing. I wrote this code partially based on the `multiprocessing` examples. http://docs.python.org/library/multiprocessing.html#examples I see that `args` is not a tuple anymore, it should be `args=(filenames,)` – Wolph Mar 12 '11 at 20:41
  • @J.F. Sebastian: wouldn't that mean that you are either using a single connection (network IO bound) or a connection per file? Without keeping the connection open for a worker it would be very inefficient. Using a single connection for all workers would also be very ineffective. – Wolph Mar 12 '11 at 20:43
  • 1
    @WoLpH: You could use a connection per worker if you cache the connection for each worker https://gist.github.com/53154856be2005d4fa50#file_multiprocessing_pool_cloudfiles.py or you could use ConnectionPool https://gist.github.com/53154856be2005d4fa50#file_connectionpool_cloudfiles.py – jfs Mar 12 '11 at 21:56
  • @J.F. Sebastian: ah yes, caching the connection for each worker is also an option. But personally I find my method cleaner since it can also work in a multithreading environment with a non-threadsafe connection. Your ConnectionPool example is works but has to recreate the container for every iteration which is also a waste of resources. But still very nice alternatives, you should add them as an answer :) – Wolph Mar 13 '11 at 02:14
  • @WoLpH: The access to connection is serialized in both my examples. Your method works in multithreading environment only if `cloudfiles.get_connection()` always returns a new connection which it does (or a thread local). ConnectionPool example could use caching too https://gist.github.com/53154856be2005d4fa50#file_connectionpool_cloudfiles.py So no waste of resources if it matters. – jfs Mar 13 '11 at 08:56
  • @J.F. Sebastian - [both your code examples](https://gist.github.com/53154856be2005d4fa50) work great. I get 100 file uploads in 13-15 seconds with both of them (using 16 processes on a 4-core mac). Does one of them have advantages over the other? @WoLpH: I still get `TypeError: 'Queue' object is not iterable` when I try to run your code, so I can't get it to run. It's perhaps a tiny error somewhere in my code or yours that I don't see. – Hobhouse Mar 13 '11 at 11:10
  • @Hobhouse: `multiprocessing` version is more resilient to innocent code changes but it requires more memory (if the bottleneck is a network latency and not a network bandwidth or a disk speed then you could improve the performance by using a larger pool that in the `multiprocessing` case means noticeably more memory). Make sure you use comma here: `args=(filenames,)` and you use `iter(filenames.get, 'STOP')` for iteration. Rename it to `filename_queue` if you already use `filenames` name somewhere to avoid accidental collisions. – jfs Mar 13 '11 at 11:56
  • @J.F. Sebastian: Thank you for code and debugging - I missed the comma in `args=(filenames,)`. – Hobhouse Mar 13 '11 at 13:03
  • @J.F. Sebastian: you are right, I did not make the assumption that `cloudfiles.get_connection()` is threadsafe. Great examples :) – Wolph Mar 13 '11 at 19:14