4

I'm trying to read a huge csv.gz file from a url into chunks and write it into a database on the fly. I have to do all this in memory, no data can exist on disk.

I have the below generator function that generates the response chunks into Dataframe objects.

It works using the request's response.raw as input for the pd.read_csv function, but it appears unreliable and can sometimes throw the timeout error: urllib3.exceptions.ProtocolError: ('Connection broken: OSError("(10054, \'WSAECONNRESET\')",)', OSError("(10054, 'WSAECONNRESET')",))

response = session.get(target, stream=True)
df_it = pd.read_csv(response.raw, compression='gzip', chunksize=10**6, 
                    header=None, dtype=str, names=columns, parse_dates=['datetime'])
for i, df in enumerate(self.process_df(df_it)):
    if df.empty:
        continue
    if (i % 10) == 0:
        time.sleep(10)
    yield df

I decided to use iter_content instead, as I read it should be more reliable. I have implemented the below functionality, but I'm getting this error: EOFError: Compressed file ended before the end-of-stream marker was reached.

I think it's to do with the fact I'm passing in a compressed Bytes object (?) but I'm not sure how to pass pandas.read_csv an object it will accept.

response = session.get(target, stream=True)
for chunk in response.iter_content(chunk_size=10**6):
    file_obj = io.BytesIO()
    file_obj.write(chunk)
    file_obj.seek(0)
    df_it = pd.read_csv(file_obj, compression='gzip', dtype=str,
                        header=None, names=columns, parse_dates=['datetime'])
    for i, df in enumerate(self.process_df(df_it)):
        if df.empty:
            continue
        if (i % 10) == 0:
            time.sleep(10)
        yield df

Any ideas greatly appreciated !

Thanks

ChatNoir
  • 415
  • 8
  • 18

1 Answers1

1

You may wish to try this:

def iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE):
    """
    Lets you use an iterable (e.g. a generator) that yields bytestrings as a read-only
    input stream.

    The stream implements Python 3's newer I/O API (available in Python 2's io module).
    For efficiency, the stream is buffered.
    """
    class IterStream(io.RawIOBase):
        def __init__(self):
            self.leftover = None
        def readable(self):
            return True
        def readinto(self, b):
            try:
                l = len(b)  # We're supposed to return at most this much
                chunk = self.leftover or next(iterable)
                output, self.leftover = chunk[:l], chunk[l:]
                b[:len(output)] = output
                return len(output)
            except StopIteration:
                return 0    # indicate EOF
    return io.BufferedReader(IterStream(), buffer_size=buffer_size)

Then

response = session.get(target, stream=True)
response.raw.decode_content = decode
df = pd.read_csv(iterable_to_stream(response.iter_content()), sep=';')

I use this to stream csv files in odsclient. It seems to work, although I did not try with gz compression.

Source: https://stackoverflow.com/a/20260030/7262247

smarie
  • 4,568
  • 24
  • 39
  • 1
    Nice! But beware of `resonse.iter_content()`. For me it had a default `chunk_size` of 1. Increasing the `chunk_size` considerably sped things up for me. – Biesi Jul 20 '22 at 11:42
  • Thanks @BiesiGrr ! I did not notice it previously – smarie Jul 20 '22 at 13:38