1

I am using python boto to interact with s3. The files I have on s3 are CSV's where I'd like to read lines from s3 using a buffer to bound memory usage.

I was wondering if anyone had any way of composing python's io classes to achieve this? The goal is to have some sort of abstraction that is able to wrap boto Key, and provides a readline or iterator interface over the key (which only provides a read(size=0) call. The complexity is that since it is stored as a CSV, each row is variable length.

The goal was to have an abstraction that I was able to wrap python boto key with and then implemented iterator protocol so that I could pass it to csv reader, which I ended up implementing myself.

It looks like python io really has all the pieces to do this BufferedReader and TextIOWrapper, and I fooled around with it by naively trying to pass the boto Key to it, but BufferedReader expected an IOBase object.

I then implemented the IOBase protocol around the Key but got unicode errors, and just generally wasn't sure what I was doing.

Does anyone know if python io can do something similar to what's described above??


Technical specs:

There is a directory of 1-100 CSV files on s3. All have the same format, but a variable number of rows. I am trying to implement a function that takes an iterator of boto Keys.

Key provides a read(num_bytes) method.

def yield_lines(keys_iterator):
   # had to custom implement this
   # any way using io??
   # yield each CSV row across keys that only provide `read()` method

My initial attempt was to try and make boto Key adhere to IOBase. I would compose it with a buffered reader and then try and read lines from it using a TextIOWrapper but ran into encoding issues with readinto.

class IOCompatibleKey(object):

   def __init__(self, s3_key):
      self.s3_key = s3_key

   def readable(self):
      return True

   def writeable(self):
      return False

   def read(num_bytes):
      return self.s3_key.read(num_bytes)

   def readinto(n):
      # .... ?????

buffered_reader = BufferedReader(IOCompatibleKey(s3_key))
text_reader = TextIOWrapper(buffered_reader)
for line in text_reader: # <- IS THIS POSSIBLE????
    print(line)
dm03514
  • 54,664
  • 18
  • 108
  • 145

1 Answers1

4

In Python 2, you want to avoid a TextIOWrapper object, as the csv.reader() object expects a bytestring instead. It can't handle the unicode objects TextIOWrapper provides.

Providing an IOBase implementation is otherwise simple enough:

class IOCompatibleKey(object):    
    def __init__(self, s3_key):
        self.s3_key = s3_key

    def readable(self):
        return True

    def writeable(self):
        return False

    @property
    def closed(self):
        return self.s3_key.closed

    def close(self):
        self.s3_key.close()

    def read(self, num_bytes):
        return self.s3_key.read(num_bytes)

    def readinto(self, n):
        chunk = self.s3_key.read(len(n))
        read = len(chunk)
        n[:read] = chunk
        return read

and only use a BufferedReader when using Python 2:

buffered_reader = BufferedReader(IOCompatibleKey(s3_key))
csv_reader = csv.reader(buffered_reader)
for row in csv_reader:
    print(row)

On Python 3, just add a TextIOWrapper() on top of the BufferedReader()

Demo in Python 2, using a mocked key:

>>> import random, csv
>>> from io import BufferedReader
>>> class Key(object):
...     closed = False
...     def read(self, bytes=1024):
...         if random.random() < 0.2:
...             bytes = random.randrange(bytes)
...         return ''.join([random.choice('abcdefghijklmnopqrstuvwxyz \n,') for _ in range(bytes)])
...
>>> s3_key = Key()
>>> buffered_reader = BufferedReader(IOCompatibleKey(s3_key))
>>> next(buffered_reader)   # produces a single \n terminated line
'nffdahuitmdaktibxjsdgyhlyfm gurfyo,nt\n'
>>> reader = csv.reader(buffered_reader)  # which satisfies csv.reader
>>> next(reader)
['bi iydribq', 'u']
>>> next(reader)
['qzxtbhkk se', 'v', 'b', 'nunyjemtkxaphuqmvgfrfjdloxwohqamdtvfqgddfna cjuzpaotccenxhhhgnvrbey']
Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343
  • Thanks for this answer. It's great. I take it then that it's not possible to do this in a way that is compatible with both? – Pablo Feb 13 '19 at 00:38
  • 1
    @Pablo the Python 2 `cvs.reader()` implementation is just too different to make a one-size-fits-both solution work. Just use an `if PY3:` test time add the `TextIOWrapper()` wrapping conditionally. – Martijn Pieters Feb 13 '19 at 08:54
  • fair enough. Thanks! – Pablo Feb 13 '19 at 18:18