35

I have a generator producing a list of strings. Is there a utility/adapter in Python that could make it look like a file?

For example,

>>> def str_fn():
...     for c in 'a', 'b', 'c':
...         yield c * 3
... 
>>> for s in str_fn():
...     print s
... 
aaa
bbb
ccc
>>> stream = some_magic_adaptor(str_fn())
>>> while True:
...    data = stream.read(4)
...    if not data:
...        break
...    print data
aaab
bbcc
c

Because data may be big and needs to be streamable (each fragment is a few kilobytes, the entire stream is tens of megabytes), I do not want to eagerly evaluate the whole generator before passing it to stream adaptor.

Alex B
  • 82,554
  • 44
  • 203
  • 280

8 Answers8

25

The "correct" way to do this is inherit from a standard Python io abstract base class. However it doesn't appear that Python allows you to provide a raw text class, and wrap this with a buffered reader of any kind.

The best class to inherit from is TextIOBase. Here's such an implementation, handling readline, and read while being mindful of performance. (gist)

import io

class StringIteratorIO(io.TextIOBase):

    def __init__(self, iter):
        self._iter = iter
        self._left = ''

    def readable(self):
        return True

    def _read1(self, n=None):
        while not self._left:
            try:
                self._left = next(self._iter)
            except StopIteration:
                break
        ret = self._left[:n]
        self._left = self._left[len(ret):]
        return ret

    def read(self, n=None):
        l = []
        if n is None or n < 0:
            while True:
                m = self._read1()
                if not m:
                    break
                l.append(m)
        else:
            while n > 0:
                m = self._read1(n)
                if not m:
                    break
                n -= len(m)
                l.append(m)
        return ''.join(l)

    def readline(self):
        l = []
        while True:
            i = self._left.find('\n')
            if i == -1:
                l.append(self._left)
                try:
                    self._left = next(self._iter)
                except StopIteration:
                    self._left = ''
                    break
            else:
                l.append(self._left[:i+1])
                self._left = self._left[i+1:]
                break
        return ''.join(l)
Matt Joiner
  • 112,946
  • 110
  • 377
  • 526
15

Here's a solution that should read from your iterator in chunks.

class some_magic_adaptor:
  def __init__( self, it ):
    self.it = it
    self.next_chunk = ""
  def growChunk( self ):
    self.next_chunk = self.next_chunk + self.it.next()
  def read( self, n ):
    if self.next_chunk == None:
      return None
    try:
      while len(self.next_chunk)<n:
        self.growChunk()
      rv = self.next_chunk[:n]
      self.next_chunk = self.next_chunk[n:]
      return rv
    except StopIteration:
      rv = self.next_chunk
      self.next_chunk = None
      return rv


def str_fn():
  for c in 'a', 'b', 'c':
    yield c * 3

ff = some_magic_adaptor( str_fn() )

while True:
  data = ff.read(4)
  if not data:
    break
  print data
Michael Anderson
  • 70,661
  • 7
  • 134
  • 187
  • Not as built-in as I'd like, but still does the job well. :thumbsup: – Alex B Sep 26 '12 at 10:06
  • Not as complete as using an `io` ABC, and there's a performance hit when adding lots of yielded strings in this fashion. Better to `join` them in one hit only to satisfy the interface, but nice regardless. – Matt Joiner Sep 27 '12 at 05:09
  • Great! I had to add "some_magic_adaptor.readline()" copying "read()" and replacing "while len(self.next_chunk) – Pietro Battiston Jan 23 '15 at 17:42
5

The problem with StringIO is that you have to load everything into the buffer up front. This can be a problem if the generator is infinite :)

from itertools import chain, islice
class some_magic_adaptor(object):
    def __init__(self, src):
        self.src = chain.from_iterable(src)
    def read(self, n):
        return "".join(islice(self.src, None, n))
John La Rooy
  • 295,403
  • 53
  • 369
  • 502
  • Hm, I'd love to use it this way, the problem is that this will yield characters one by one. In my stream, each fragment is in kilobytes, the entire stream is in megabytes, which (given CPython's pitiful nominal code performance) may create a bottleneck where it could be easily avoided. – Alex B Sep 26 '12 at 02:25
  • @AlexB, Perhaps you can write up the results when you try out the various alternatives. I will be interested to see the results. – John La Rooy Sep 26 '12 at 02:48
  • @gnibbler In a purely synthetic microbenchmark I get about 20x slowdown in CPython and 100x in PyPy because its optimiser isn't itertools-friendly, compared to chunked solution in the other answer, which is similar in performance to straight up read from StringIO. In our prod code related tasks take ~50% longer (not fatal, but not trivial either). The setup was still more artificial than I'd like, but running a real prod-like test is a bit of a hassle for me right now. See synthetic test code: https://gist.github.com/3786538 – Alex B Sep 26 '12 at 10:04
5

Here's a modified version of John and Matt's answer that can read a list/generator of strings and output bytearrays

import itertools as it
from io import TextIOBase

class IterStringIO(TextIOBase):
    def __init__(self, iterable=None):
        iterable = iterable or []
        self.iter = it.chain.from_iterable(iterable)

    def not_newline(self, s):
        return s not in {'\n', '\r', '\r\n'}

    def write(self, iterable):
        to_chain = it.chain.from_iterable(iterable)
        self.iter = it.chain.from_iterable([self.iter, to_chain])

    def read(self, n=None):
        return bytearray(it.islice(self.iter, None, n))

    def readline(self, n=None):
        to_read = it.takewhile(self.not_newline, self.iter)
        return bytearray(it.islice(to_read, None, n))

usage:

ff = IterStringIO(c * 3 for c in ['a', 'b', 'c'])

while True:
    data = ff.read(4)

    if not data:
        break

    print data

aaab
bbcc
c

alternate usage:

ff = IterStringIO()
ff.write('ddd')
ff.write(c * 3 for c in ['a', 'b', 'c'])

while True:
    data = ff.read(4)

    if not data:
        break

    print data

ddda
aabb
bccc
reubano
  • 5,087
  • 1
  • 42
  • 41
4

There is one called werkzeug.contrib.iterio.IterIO but note that it stores the entire iterator in its memory (up to the point you have read it as a file) so it might not be suitable.

http://werkzeug.pocoo.org/docs/contrib/iterio/

Source: https://github.com/mitsuhiko/werkzeug/blob/master/werkzeug/contrib/iterio.py

An open bug on readline/iter: https://github.com/mitsuhiko/werkzeug/pull/500

r3m0t
  • 1,850
  • 16
  • 21
2

Looking at Matt's answer, I can see that it's not always necessary to implement all the read methods. read1 may be sufficient, which is described as:

Read and return up to size bytes, with at most one call to the underlying raw stream’s read()...

Then it can be wrapped with io.TextIOWrapper which, for instance, has implementation of readline. As an example here's streaming of CSV-file from S3's (Amazon Simple Storage Service) boto.s3.key.Key which implements iterator for reading.

import io
import csv

from boto import s3


class StringIteratorIO(io.TextIOBase):

    def __init__(self, iter):
        self._iterator = iter
        self._buffer = ''

    def readable(self):
        return True

    def read1(self, n=None):
        while not self._buffer:
            try:
                self._buffer = next(self._iterator)
            except StopIteration:
                break
        result = self._buffer[:n]
        self._buffer = self._buffer[len(result):]
        return result


conn = s3.connect_to_region('some_aws_region')
bucket = conn.get_bucket('some_bucket')
key = bucket.get_key('some.csv')    

fp = io.TextIOWrapper(StringIteratorIO(key))
reader = csv.DictReader(fp, delimiter = ';')
for row in reader:
    print(row)

Update

Here's an answer to related question which looks a little better. It inherits io.RawIOBase and overrides readinto. In Python 3 it's sufficient, so instead of wrapping IterStream in io.BufferedReader one can wrap it in io.TextIOWrapper. In Python 2 read1 is needed but it can be simply expressed though readinto.

saaj
  • 23,253
  • 3
  • 104
  • 105
1

If you only need a read method, then this can be enough

from io import IOBase

def to_file_like_obj(iterable, base):
    chunk = base()
    offset = 0
    it = iter(iterable)

    def up_to_iter(size):
        nonlocal chunk, offset

        while size:
            if offset == len(chunk):
                try:
                    chunk = next(it)
                except StopIteration:
                    break
                else:
                    offset = 0
            to_yield = min(size, len(chunk) - offset)
            offset = offset + to_yield
            size -= to_yield
            yield chunk[offset - to_yield : offset]

    class FileLikeObj(IOBase):
        def readable(self):
            return True

        def read(self, size=-1):
            return base().join(
                up_to_iter(float('inf') if size is None or size < 0 else size)
            )

    return FileLikeObj()

which can be used for an iterable yielding str

my_file = to_file_like_object(str_fn, str)

or if you have an iterable yielding bytes rather than str, and you want a file-like object whose read method returns bytes

my_file = to_file_like_object(bytes_fn, bytes)

This pattern has a few nice properties I think:

  • Not much code, which can be used for both str and bytes
  • Returns exactly what has been asked for in terms of length, in both of the cases of the iterable yielding small chunks, and big chunks (other than at the end of the iterable)
  • Does not append str/bytes - so avoids copying
  • Leverages slicing - so also avoids copying because a slice of a str/bytes that should be the entire instance will return exactly that same instance
  • For the bytes case, it's enough of a file-like object to pass through to boto3's upload_fileobj for multipart upload to S3
  • For the str case, can also be used for psycopg2's copy_expert / psycopg3's copy
Michal Charemza
  • 25,940
  • 14
  • 98
  • 165
-1

this is exactly what stringIO is for ..

>>> import StringIO
>>> some_var = StringIO.StringIO("Hello World!")
>>> some_var.read(4)
'Hell'
>>> some_var.read(4)
'o Wo'
>>> some_var.read(4)
'rld!'
>>>

Or if you wanna do what it sounds like

Class MyString(StringIO.StringIO):
     def __init__(self,*args):
         StringIO.StringIO.__init__(self,"".join(args))

then you can simply

xx = MyString(*list_of_strings)
Joran Beasley
  • 110,522
  • 12
  • 160
  • 179
  • After edit: This evaluates the whole list of strings upfront (see the last sentence of the question). – Alex B Sep 26 '12 at 02:22