4

There are several answers on stack overflow about retrieving a FTP file and writing it to a stream such as a string buffer or a file which can then be iterated on.

Such as: Read a file in buffer from FTP python

However, these solutions involve loading the entire file into memory or downloading it to the disk before beginning to process the contents.

I do not have enough memory to buffer the whole file and I do not have access to the disk. This can be done by processing the data in the callback function, but I want to know if it's possible to wrap the ftp code in some magic that returns an iterator rather than peppering my code with callbacks.

I.E. rather than:

def get_ftp_data(handle_chunk):
    ...
    ftp.login('uesr', 'password') # authentication required
    ftp.retrbinary('RETR etc', handle_chunk)
    ...

get_ftp_data(do_stuff_to_chunk)

I want:

for chunk in get_ftp_data():
    do_stuff_to_chunk(chunk)

And (unlike existing answers) I want to do it without writing the entire ftp file to disk or memory before iterating on it.

Community
  • 1
  • 1
Nathan Buesgens
  • 1,415
  • 1
  • 15
  • 29
  • 1
    There is similar question [Turn functions with a callback into Python generators?](http://stackoverflow.com/questions/9968592/turn-functions-with-a-callback-into-python-generators) – Jan Vlcinsky Apr 29 '16 at 18:06

1 Answers1

7

You'll have to put the retrbinary call in another thread and have the callback feed blocks to an iterator:

import threading, Queue

def ftp_chunk_iterator(FTP, command):
    # Set maxsize to limit the number of chunks kept in memory at once.
    queue = Queue.Queue(maxsize=some_appropriate_size)

    def ftp_thread_target():
        FTP.retrbinary(command, callback=queue.put)
        queue.put(None)

    ftp_thread = threading.Thread(target=ftp_thread_target)
    ftp_thread.start()

    while True:
        chunk = queue.get()
        if chunk is not None:
            yield chunk
        else:
            return

If you can't use threads, the best you can do is writing your callback as a coroutine:

from contextlib import closing


def process_chunks():
    while True:
        try:
            chunk = yield
        except GeneratorExit:
            finish_up()
            return
        else:
            do_whatever_with(chunk)

with closing(process_chunks()) as coroutine:

    # Get the coroutine to the first yield
    coroutine.next()

    FTP.retrbinary(command, callback=coroutine.send)
# coroutine.close() #  called by exiting the block
Jan Vlcinsky
  • 42,725
  • 12
  • 101
  • 98
user2357112
  • 260,549
  • 28
  • 431
  • 505
  • I was afraid of that. Intuitively though, it doesn't seem like something that should absolutely require threads. Also, while I didn't explicitly state this in the original questions, my execution environment doesn't have threads. I hope there's a better way. – Nathan Buesgens Apr 29 '16 at 16:51
  • @natb1: Unfortunately, it does require threads. If you can't use threads, the best you can do is write your callback as a coroutine, and that's less flexible and a lot more mess. – user2357112 Apr 29 '16 at 17:04
  • thanks for introducing me to coroutines. unfortunately that example looks to me like a longer winded way of saying `FTP.retrbinary(command, callback=do_whatever_with)` – Nathan Buesgens Apr 29 '16 at 17:37
  • @natb1: It is if `do_whatever_with` is a simple function, but you can put an arbitrary block of code there with dependence on the state of the coroutine. In cases where it does reduce to `FTP.retrbinary(command, callback=do_whatever_with)`, the iterator would have been unnecessary bloat too. – user2357112 Apr 29 '16 at 17:40
  • @user2357112 I like the threaded version. The coroutine one looks at first glance as simple calback solution, but there is significant difference - whithin the `process_chunks` generator **all the processing (for all chunks) is written within one piece of code which does not return until close()**. Really nice. Proposal: what about putting coroutine creation and closing into `with` block? – Jan Vlcinsky Apr 29 '16 at 18:29
  • @user2357112 If you like, I could edit your coroutine code by adding `from contextlib import closing` and putting coroutine creation into `with closing(process_chunks()) as coroutine:` block (locally tested and it works) saving the `coroutine.close()` call. – Jan Vlcinsky Apr 29 '16 at 18:42
  • @JanVlcinsky: Sure, go ahead. I'm not sure about what you mean by "does not return until close()", though. – user2357112 Apr 29 '16 at 18:59
  • @user2357112 Done. Revert back, if you feel it was better before. I added very similar answer to [Turn functions with a callback into Python generators?](http://stackoverflow.com/a/36946209/346478) – Jan Vlcinsky Apr 29 '16 at 19:40
  • How can you specify the chunk size? – César Jun 23 '17 at 16:57
  • @César: `FTP.retrbinary` takes an [optional 3rd parameter](https://docs.python.org/3/library/ftplib.html#ftplib.FTP.retrbinary) specifying the maximum chunk size. On Python 3, you can specify it by name as `blocksize`; on Python 2, I believe it's positional-only. – user2357112 Jun 23 '17 at 17:12
  • @user2357112 whoa, how could I miss that? Thank you so much! – César Jun 23 '17 at 17:14