3

I'm stuck piping ftplib.FTP.retrlines to csv.reader...

FTP.retrlines repeatedly calls a callback with a line in it, while csv.reader expects an iterator which returns a string each time its __next__() method is called.

How do I combine the two things together so that I can read and process the file without reading the whole file in advance and e.g storing it in a e.g. io.TextIOWrapper?

My problem is FTP.retrlines won't return until it consumed the whole file...

Martin Prikryl
  • 188,800
  • 56
  • 490
  • 992
neurino
  • 11,500
  • 2
  • 40
  • 63

2 Answers2

4

I'm not sure if there's not a better solution, but you can glue the FTP.retrlines and csv.reader together using iterable queue-like object. And as both the functions are synchronous, you have to run them on different threads in parallel.

Something like this:

from queue import Queue
from ftplib import FTP
from threading import Thread
import csv
 
ftp = FTP(host)
ftp.login(username, password)

class LineQueue:
    _queue = Queue(10)

    def add(self, s):
        print(f"Queueing line {s}")
        self._queue.put(s)
        print(f"Queued line {s}")

    def done(self):
        print("Signaling Done")
        self._queue.put(False)
        print("Signaled Done")

    def __iter__(self):
        print("Reading lines")
        while True:
            print("Reading line")
            s = self._queue.get()
            if s == False:
                print("Read all lines")
                break

            print(f"Read line {s}")
            yield s

q = LineQueue()

def download():
    ftp.retrlines("RETR /path/data.csv", q.add)
    q.done()

thread = Thread(target=download)
thread.start()

print("Reading CSV")
for entry in csv.reader(q):
    print(entry)

print("Read CSV")

thread.join()
Martin Prikryl
  • 188,800
  • 56
  • 490
  • 992
  • 1
    This works like a charm, thank you! I ended up subclassing Queue directly for sake of brevity, posting my code below, in case it can be of any help – neurino Feb 09 '21 at 14:19
1

Same solution as Martin's, just saved some line of code subclassing queue.Queue directly.

from queue import Queue
from ftplib import FTP
from threading import Thread
import csv
 
ftp = FTP(**ftp_credentials)

class LineQueue(Queue):
    def __iter__(self):
        while True:
            s = self.get()
            if s is None:
                break
            yield s

    def __call__(self):
        ftp.retrlines(f"RETR {fname}", self.put)
        self.put(None)

q = LineQueue(10)

thread = Thread(target=q)
thread.start()

for entry in csv.reader(q):
    print(entry)

thread.join()
neurino
  • 11,500
  • 2
  • 40
  • 63