9

I need to process a large remote CSV line by line without downloading it entirely.

Below is the closest I got. I iterate byte chunks from Azure, and have some code to handle truncated lines. But this cannot work if csv values contain a newline as I am not able to discernate between value newlines and csv newlines.

# this does not work
def azure_iter_lines(logger_scope, client, file_path):
    # get a StorageStreamDownloader
    # https://learn.microsoft.com/en-us/python/api/azure-storage-file-datalake/azure.storage.filedatalake.storagestreamdownloader?view=azure-python
    file_client = client.get_file_client(file_path)
    file_handle = file_client.download_file()

    truncated_line = ''
    for chunk in file_handle.chunks():
        # have the previous truncated line appended to the next block
        chunk_txt = truncated_line + chunk.decode("utf-8")
        lines = chunk_txt.split('\n') # THIS CANNOT WORK AS VALUES CONTAIN NEWLINES
        for line in lines[0:len(lines)-2]:
            yield line
        truncated_line = lines[len(lines)-1]

    # process the last chunk (same code)
    chunk_txt = truncated_line
    lines = chunk_txt.split('\n') # THIS CANNOT WORK AS VALUES CONTAIN NEWLINES
    for line in lines[0:len(lines)-2]:
        yield line
    truncated_line = lines[len(lines)-1]

Ideally I would use csv.DictReader() but I was not able to to so as it downloads the file entirely.

# this does not work
def azure_iter_lines(logger_scope, client, file_path):
    file_client = client.get_file_client(file_path)
    file_handle = file_client.download_file()
    buffer = io.BytesIO()
    file_handle.readinto(buffer) # THIS DOWNLOADS THE FILE ENTIRELY
    csvreader = csv.DictReader(buffer, delimiter=";")
    return csvreader

Here is an update using some hints by @H.Leger

Please note that this still does not work

file_client = client.get_file_client(file_path)
file_handle = file_client.download_file()
stream = codecs.iterdecode(file_handle.chunks(), 'utf-8')
csvreader = csv.DictReader(stream, delimiter=";")
for row in csvreader:
    print(row)
# => _csv.Error: new-line character seen in unquoted field - do you need to open the file in universal-newline mode?

EDIT: Final solution based on @paiv answer

EDIT: Updated solution to use io instead of codecs for faster parsing

import io
import csv
import ctypes as ct

# bytes chunk iterator to python stream adapter 
# https://stackoverflow.com/a/67547597/2523414

class ChunksAdapter:
    def __init__(self, chunks):
        self.chunks = chunks
        self.buf = b''
        self.closed = False
    
    def readable(self):
        return True
        
    def writable(self):
        return False
    
    def seekable(self):
        return False
        
    def close(self):
        self.closed = True
        
    def read(self, size):
        if not self.buf:
            self.buf = next(self.chunks, b'')
        res, self.buf = self.buf[:size], self.buf[size:]
        return res



# get the downloader object
file_client = client.get_file_client(file_path)
downloader = file_client.download_file()
# adapt the downloader iterator to a byte stream
file_object = ChunksAdapter(downloader.chunks())
# decode bytes stream to utf-8
text_stream = io.TextIOWrapper(file_object, encoding='utf-8', newline='') 

# update csv field limit to handle large fields
# https://stackoverflow.com/a/54517228/2523414
csv.field_size_limit(int(ct.c_ulong(-1).value // 2)) 

csvreader = csv.DictReader(text_stream, delimiter=";", quotechar='"', quoting=csv.QUOTE_MINIMAL)
for row in csvreader:
    print(row)
Clément Prévost
  • 8,000
  • 2
  • 36
  • 51
  • Have you tried to use pandas:https://www.kite.com/python/answers/how-to-read-a-large-csv-file-in-chunks-with-pandas-in-python? We can defile how many lines we read at one time. – Jim Xu May 13 '21 at 02:51
  • @Jim Xu I couldn't find a way to use this without downloading the entire file first – Clément Prévost May 14 '21 at 06:16
  • Hi pandas.read_csv support http url. I think you can try it with the file URL which contained as token. – Jim Xu May 14 '21 at 06:39
  • @Jim Xu I don't know if I can use an URL for this. I use the pyhton azure datalake storage client with cert auth – Clément Prévost May 14 '21 at 21:19
  • @ClémentPrévost Can you provide a small sample of the csv youre trying to parse – dracarys May 15 '21 at 13:49
  • It seems that `.chunks()` will issue a separate HTTP request for each chunk. If this is acceptable to you, you can adapt `.chunks()` interface into a read-only file object https://docs.python.org/3/glossary.html#term-file-object, and stream from it via utf-8 codec into csv reader. – paiv May 15 '21 at 18:25
  • @dracarys It's a standard CSV with ; separator and " as quote char. Columns are only quoted if they are multiline or contain a separator – Clément Prévost May 16 '21 at 15:03
  • @paiv I'm not sure how I could do that :s – Clément Prévost May 16 '21 at 15:04
  • When a file is large chunking is the way to go. Usually when CSV files have newline characters in the values the starting and ending of that particular value is delimited by quotes `"`. Based upon that you can write an iterator which parses character by character and yielding only when an actual CSV line break is encountered. [Wikipedia](https://en.wikipedia.org/wiki/Comma-separated_values#Basic_rules) Fields with embedded line breaks must be quoted. – risahbhc32 May 16 '21 at 16:53
  • @Rishabh Chauhan That's an idea but that mean reimplementing a CSV parser, which is not trivial. Also I have to handle quoted quotes `""` inside a value. Isn't there a simpler and more standard way? – Clément Prévost May 17 '21 at 09:06
  • @ClémentPrévost How large is the CSV? – risahbhc32 May 17 '21 at 10:00
  • @ClémentPrévost This is the closet I have found to a streaming CSV parser in python. https://github.com/frictionlessdata/tabulator-py#working-with-stream – risahbhc32 May 17 '21 at 10:18
  • @risahbhc32 too large to download locally, 100s of Gb. – Clément Prévost May 17 '21 at 13:53
  • Updated the question with a new idea, it does not work too but for another reason – Clément Prévost May 17 '21 at 14:58

2 Answers2

7

Disclaimer: I know little Azure specifics. Ultimately, you would want to stream separate chunks too.

In Python, given a file object, you can set up CSV streaming this way:

import codecs
import csv
codec = codecs.getreader('utf-8')
text_stream = codec(file_object)
csvreader = csv.DictReader(text_stream)

Now you can iterate over csvreader, and it will read from file_object in a streaming fasion.

Edit: as @Martijn Pieters suggested, we can gain performance with TextIOWrapper instead of codecs:

text_stream = io.TextIOWrapper(file_object, encoding='utf-8', newline='')

Check the comment in csv module on newline parameter.

But Azure's StorageStreamDownloader does not provide python's file object interface. It has .chunks() generator (which I assume will invoke separate HTTP request to retrieve next chunk).

You can adapt .chunks() into a file object with a simple adapter:

class ChunksAdapter:
    def __init__(self, chunks):
        self.chunks = chunks
        self.buf = b''
        
    def read(self, size):
        if not self.buf:
            self.buf = next(self.chunks, b'')
        res, self.buf = self.buf[:size], self.buf[size:]
        return res

And use like

downloader = file_client.download_file()
file_object = ChunksAdapter(downloader.chunks())

Be sure to configure DictReader for the appropriate CSV dialect.

And set appropriate values for max_single_get_size, max_chunk_get_size on the blob client.

paiv
  • 5,491
  • 22
  • 30
3

I believe the requests package can be useful for you. Using the stream option while getting your file and the Response.iter_lines() function should do what you need :

import codecs
import csv
import requests

url = "https://navitia.opendatasoft.com//explore/dataset/all-datasets/download?format=csv"
r = requests.get(url, stream=True)  # using the stream option to avoid loading everything

try:
    buffer = r.iter_lines()  # iter_lines() will feed you the distant file line by line
    reader = csv.DictReader(codecs.iterdecode(buffer, 'utf-8'), delimiter=';')
    for row in reader:
        print(row)  # Do stuff here
finally:
    r.close()
H.Leger
  • 83
  • 6
  • 1
    Thank you for your answer. I have 2 concerns here: i only have access to the azure lib that feed me a bytes chunk iterator, I cannot use requests. Even if that was possible, do iter_lines and dict reader understand newlines inside column values? – Clément Prévost May 17 '21 at 12:09
  • Oops, I didn't realize azure was so special. For your second point, there should be no issue with newlines inside string columns if the file is properly formatted. – H.Leger May 18 '21 at 08:00
  • I updated the question to use codecs.iterdecode and I have a newline-related error. – Clément Prévost May 18 '21 at 08:37