5

It is simple to get a StorageStreamDownloader using the azure.storage.blob package:

from azure.storage.blob import BlobServiceClient

blob_service_client = BlobServiceClient.from_connection_string("my azure connection string")
container_client = blob_service_client.get_container_client("my azure container name")
blob_client = container_client.get_blob_client("my azure file name")
storage_stream_downloader = blob_client.download_blob()

and it is simple to process a file-like object, or more specifically, I think, a string-returning iterator (or the file path of the object) in the csv package:

import csv
from io import StringIO
 
csv_string = """col1, col2
a,b
c,d"""
with StringIO(csv_string) as csv_file:
  for row in csv.reader(csv_file):
    print(row) # or rather whatever I actually want to do on a row by row basis, e.g. ascertain that the file contains a row that meets a certain condition

What I'm struggling with is getting the streaming data from my StorageStreamDownloader into csv.reader() in such a way that I can process each line as it arrives rather than waiting for the whole file to download.

The Microsoft docs strike me as a little underwritten by their standards (the chunks() method has no annotation?) but I see there is a readinto() method for reading into a stream. I have tried reading into a BytesIO stream but cannot work out how to get the data out into csv.reader() without just outputting the buffer to a new file and reading that file. This all strikes me as a thing that should be doable but I'm probably missing something obvious conceptually, perhaps to do with itertools or asyncio, or perhaps I'm just using the wrong csv tool for my needs?

  • 1
    You can use pandas to read CSV file with BytesIO. – Jim Xu Feb 05 '21 at 06:45
  • That's really helpful, thanks Jim. I was trying to stick to the specialised csv library as I'm not doing any actual data point analysis but if pandas handles it then I'll give that a go – Christopher Alcock Feb 05 '21 at 11:03

2 Answers2

5

Based on a comment by Jim Xu:

stream = blob_client.download_blob()  
with io.BytesIO() as buf:
  stream.readinto(buf)

  # needed to reset the buffer, otherwise, panda won't read from the start
  buf.seek(0)

  data = pd.read_csv(buf)

or

csv_content = blob_client.download_blob().readall()
data = pd.read_csv(io.BytesIO(csv_content ))
Métoule
  • 13,062
  • 2
  • 56
  • 84
1

If you want to read csv file on row by one row, you can use the method pd.read_csv(filename, chunksize=1). For more details, please refer to here and here

For example (I use pandas1.2.1)

with pd.read_csv(content, chunksize=1) as reader:

    for chunk in reader:
        print(chunk)
        print('---------------')

enter image description here

Besides, if you want to use the method chunks(), we need to set max_chunk_get_size and max_single_get_size to the same value when we create BlobClient. For more details, please refer to here and here

For example

from azure.storage.blob import BlobClient

key = '<account_key>'

blob_client = BlobClient(account_url='https://andyprivate.blob.core.windows.net',
                         container_name='input',
                         blob_name='cities.csv',
                         credential=key,
                         max_chunk_get_size=1024,
                         max_single_get_size=1024)
stream = blob_client.download_blob()

for chunk in stream.chunks():
    print(len(chunk))

enter image description here

Jim Xu
  • 21,610
  • 2
  • 19
  • 39
  • Thanks very much indeed. Will try and get it implemented today and accept answer – Christopher Alcock Feb 08 '21 at 10:55
  • Thanks again for this but I still haven't managed to get this working together. `content` in your first example seems to be effectively the same as `csv_file` in my question, and I still don't see how I stream azure files into it? The chunk params in the second example seem very helpful tho, for optimizing once I have the stream processing actually working – Christopher Alcock Feb 08 '21 at 15:11
  • 1
    @ChristopherAlcock in the first sample, you can use the method `readinto()` to read into `BytesIO` stream. Then use pandas to process the stream. – Jim Xu Feb 09 '21 at 01:54
  • Hi Jim, I've finally worked out what was going wrong for me here. The pandas read_csv unsurprisingly returns a pandas dataframe, which behaves very differently to the csv reader, so I had to change all my processing code too, which I hadn't foolishly hadn't expected. Thanks for your help. – Christopher Alcock Feb 10 '21 at 11:22
  • I've worked on this further and whilst if I put my csv data into a BytesIO buffer csv_string like so `buffer = BytesIO(csv_string.encode('utf-8'))`, I can then process it with `pandas.read_csv()`. However if I get the data from Azure in the way suggested and read that into a buffer, such that calling `.getvalue()` is identical to the buffer from string, pandas cannot process the data: `pandas.errors.EmptyDataError('No columns to parse from file')`. I am tempted to believe that what I'm trying to do isn't possible – Christopher Alcock Feb 12 '21 at 16:46
  • 1
    @ChristopherAlcock please try to use the following code `stream = blob_client.download_blob() with BytesIO() as buf : stream.readinto(buf) pandas.read_csv(buf)` – Jim Xu Feb 13 '21 at 04:51
  • Hi Jim, Thanks again for trying, but this is essentially what I was already trying, and despite reading in a very simple valid csv, it returns `EmptyDataError: No columns to parse from file` – Christopher Alcock Feb 15 '21 at 09:39
  • If I do a `print(buf1.getvalue())` instead of `read_csv`, i get `b'Col1,Col2,Col3\nval1,val2,val3\nval4,val5,val6\nval7,val8,val9'`, which as far as I can see is totally acceptable csv data – Christopher Alcock Feb 15 '21 at 11:05