I'm trying to read a huge csv.gz file from a url into chunks and write it into a database on the fly. I have to do all this in memory, no data can exist on disk.
I have the below generator function that generates the response chunks into Dataframe objects.
It works using the request's response.raw as input for the pd.read_csv function, but it appears unreliable and can sometimes throw the timeout error: urllib3.exceptions.ProtocolError: ('Connection broken: OSError("(10054, \'WSAECONNRESET\')",)', OSError("(10054, 'WSAECONNRESET')",))
response = session.get(target, stream=True)
df_it = pd.read_csv(response.raw, compression='gzip', chunksize=10**6,
header=None, dtype=str, names=columns, parse_dates=['datetime'])
for i, df in enumerate(self.process_df(df_it)):
if df.empty:
continue
if (i % 10) == 0:
time.sleep(10)
yield df
I decided to use iter_content instead, as I read it should be more reliable. I have implemented the below functionality, but I'm getting this error: EOFError: Compressed file ended before the end-of-stream marker was reached
.
I think it's to do with the fact I'm passing in a compressed Bytes object (?) but I'm not sure how to pass pandas.read_csv an object it will accept.
response = session.get(target, stream=True)
for chunk in response.iter_content(chunk_size=10**6):
file_obj = io.BytesIO()
file_obj.write(chunk)
file_obj.seek(0)
df_it = pd.read_csv(file_obj, compression='gzip', dtype=str,
header=None, names=columns, parse_dates=['datetime'])
for i, df in enumerate(self.process_df(df_it)):
if df.empty:
continue
if (i % 10) == 0:
time.sleep(10)
yield df
Any ideas greatly appreciated !
Thanks