1

I want to load a CSV into a Dataclass in Python. The dataclass consists of strings and enums, and I want to parse it accordingly. I now that there is a python library that does it, but it does not allow to skip malformed rows, which unfortunately exist.

I have created a method for this, that can read a file and looks like this:

def dataset_reader(path: str):
        #with open(path, 'r') as csv_handler:
        reader = csv.reader(path)
        header = reader.__next__()
        expected_order = fields(MyFancyDataclass)
        order_mapping = {fieldname: index for index, fieldname in enumerate([field.name for field in expected_order])}
        header_mapping = {rowname: index for index, rowname in enumerate(header)}
        order = [header_mapping.get(i[0]) for i in sorted(order_mapping.items(), key=lambda x: x[1])]
        types = [type_ for type_ in [field.type for field in fields(MyFancyDataclass)]]
        for line in reader:
            try:
                #yield MyFancyDataclass(*[line[x] if types[x] == str else types[x](line[x]) for x in order])
                yield MyFancyDataclass(line[order[0]], line[order[1]], line[order[2]], line[order[3]], SourceType(line[order[4]]), line[order[5]], line[order[6]], line[order[7]],)
            except Exception as e:
                logging.error(line)

What I'm essentally trying to do is to not assume the order in that the CSV is written. As long as the required rows are in the file, we parse it. For this I first read the header and then create a index to column mapping. I then do the same for the dataclass and find the correct order for the CSV.

Then I read the CSV row by row. What you see there are two appraoches, one commented out (which is way more elegant, as we do not hardcode the number of columns) and one that is faster.

The problem that I have now is that it is still quite slow. As we deal with big data, this is a bit of an issue. Any good ideas of how we could speed it up? Nogos are assuming the column order in the CSVs. Although it should be consistently the same order, we do not want to assume that it always is. As essentally everything is simply a lookup it the current yield, I do not see what else we could improve to gain speed.

Thanks for all the help in advance!

CSV for reproduction. Call it test.csv:

key,value
123,aaa
234,bbb
12,aaa
1919191,bbb
12,
13,aaa
,bbb
,
123,bbb

Full minimal python script for reproduction. Store it in the same folder as test.csv is:

from dataclasses import fields, dataclass
import logging
import csv
from enum import Enum

class SourceType(Enum):
        a = "aaa"
        b = "bbb"

@dataclass
class MyFancyDataclass:
        key: str
        value: SourceType

def dataset_reader(path: str):
        #with open(path, 'r') as csv_handler:
        reader = csv.reader(path)
        header = reader.__next__()
        expected_order = fields(MyFancyDataclass)
        order_mapping = {fieldname: index for index, fieldname in enumerate([field.name for field in expected_order])}
        header_mapping = {rowname: index for index, rowname in enumerate(header)}
        order = [header_mapping.get(i[0]) for i in sorted(order_mapping.items(), key=lambda x: x[1])]
        types = [type_ for type_ in [field.type for field in fields(MyFancyDataclass)]]
        print(order)
        for line in reader:
            try:
                #yield MyFancyDataclass(*[line[x] if types[x] == str else types[x](line[x]) for x in order])
                yield MyFancyDataclass(line[order[0]], SourceType(line[order[1]]),)
            except Exception as e:
                print(e)
                logging.error(line)


if __name__=="__main__":
        print(list(dataset_reader(open("test.csv"))))
Syrius
  • 941
  • 6
  • 22
  • 1
    that play around `order_mapping/header_mapping ...` looks so overcomplicated. I'm sure there's a better way. Can you post your minimal `MyFancyDataclass` and `SourceType` definitions and a few-lined fragment of your csv file (along with a bad line in between) ? – RomanPerekhrest Mar 01 '23 at 19:46
  • Don't call `__next__` explicitly; use `header = next(reader)` to call it. – chepner Mar 01 '23 at 19:56
  • I would define a class method for `MyFancyDataclass` that takes `order[0:4]` and a `SourceType` instance as arguments and returns the appropriate instance of `MyFancyDataclass`. – chepner Mar 01 '23 at 20:01
  • @RomanPerekhrest I fully agree, that it looks overcomplicated. I also hope, that there's a simpler way. I updated the question with a full python script + csv for reproduction. – Syrius Mar 01 '23 at 21:30
  • @chepner But if you pass it, you just move the problem to a different location. The problem itself will still be there, the logic still needs to be implemented. And it should be efficient. Any advice on this? – Syrius Mar 01 '23 at 21:31
  • Do you have any evidence that this isn't slow simply because you have a lot of data to read in? – chepner Mar 01 '23 at 21:51
  • @Syrius and how large is your actual csv (in rows) ? – RomanPerekhrest Mar 01 '23 at 22:20
  • @RomanPerekhrest In the end it's a bunch of CSVs, in total around 400 million rows with on average around 30 million per file (Varies from ~10 to ~60 millions). – Syrius Mar 02 '23 at 09:03
  • @Syrius, I guess you're not collecting ~400 millions of `MyFancyDataclass` objects at once, but parsing one file, collecting objects, doing something with those objects (filtering /reducing /saving), then move on to next file, right ? – RomanPerekhrest Mar 02 '23 at 10:46
  • Yes, that's correct. But the read is still a bottelneck that we want to adress, as it essentially takes 2x as long to read + parse than if we simply read. – Syrius Mar 02 '23 at 15:33
  • 1
    @Syrius, I'm testing your code on 1M records (repeated from your csv sample) and I see that objects like `MyFancyDataclass(key='', value=)` fall into the result list. Shouldn't they be treated/captured as bad lines (any empty field) ? – RomanPerekhrest Mar 02 '23 at 19:34
  • Yes, you're right. Currently only failed values get captured. That's not perfect, but handled downstream atm. – Syrius Mar 02 '23 at 20:35
  • 2
    @Syrius, I found the most (time) performance penalty in this `logging.error(line)`, dozens of millions I/O operations to write log messages into a file; that significantly slows down the processing – RomanPerekhrest Mar 02 '23 at 22:39
  • Writing the bad rows to a CSV was about 6x faster than logging it to a file, when I tested on 1M rows with ~40% being bad. Did any of the comments or solutions get you and your team on the right track? – Zach Young Mar 09 '23 at 21:44

2 Answers2

1

I think you'll get a lot of mileage out of csv's DictReader, it glosses over the consistent ordering issue:

with open(csv_fname, newline="") as f:
    reader = csv.DictReader(f)
    for row in reader:
        yield Fancy(key=row["key"], value=row["value"])

Next, I don't know exactly what you need because your sample data blank values ("") but I don't see how you deal with that cirumstance in your dataclass which must take a string and your enumerated value.

I'll offer this to show how you might deal with blank values in the CSV (I also renamed your enum class to just Value for brevity):

@dataclass
class Fancy:
    key: str | None
    value: Value | None

    def __init__(self, key: str, value: str) -> None:
        """Create a Fancy from CSV string values, including blanks ("")."""
        self.key = key if key else None
        self.value = Value(value) if value else None

And what do you want to do if a non-enumerated string value (e.g., "ccc") is passed in?

I pared down your sample CSV to just, named input-kv.csv:

key,value
123,aaa
456,bbb
789,
,bbb
,

and also flipped the columns, input-vk.csv:

value,key
aaa,123
bbb,456
,789
bbb,
,

When I run the whole thing:

import csv

from dataclasses import dataclass
from enum import Enum
from typing import Generator


class Value(Enum):
    a = "aaa"
    b = "bbb"


@dataclass
class Fancy:
    key: str | None
    value: Value | None

    def __init__(self, key: str, value: str) -> None:
        """Create a Fancy from CSV string values, including blanks ("")."""
        self.key = key if key else None
        self.value = Value(value) if value else None


def fancy_iterator(csv_fname: str) -> Generator[Fancy, None, None]:
    with open(csv_fname, newline="") as f:
        reader = csv.DictReader(f)
        for row in reader:
            yield Fancy(key=row["key"], value=row["value"])


for kv, vk in zip(
    fancy_iter("input-kv.csv"),
    fancy_iter("input-vk.csv"),
):
    print(f"kv: {str(kv):<40}   vk: {str(vk):<40}")

I get:

kv: Fancy(key='123', value=<Value.a: 'aaa'>)   vk: Fancy(key='123', value=<Value.a: 'aaa'>)
kv: Fancy(key='456', value=<Value.b: 'bbb'>)   vk: Fancy(key='456', value=<Value.b: 'bbb'>)
kv: Fancy(key='789', value=None)               vk: Fancy(key='789', value=None)            
kv: Fancy(key=None, value=<Value.b: 'bbb'>)    vk: Fancy(key=None, value=<Value.b: 'bbb'>) 
kv: Fancy(key=None, value=None)                vk: Fancy(key=None, value=None)   

Which isn't totatlly accurate because the Fancy constructor only accepts strings, but I think it gets the point across that between flipped columns and blank fields you get the same, consistent result.

Zach Young
  • 10,137
  • 4
  • 32
  • 53
  • 1
    Wrong rows should simply be ignored, like it is done in the try- catch block. We do not want to recover from this, but just move on. But this could also easily be done with your approach. The code looks quite elegant, but the most important requirement for us is speed. Without any proof (I will actually test that in the near future), I assume that creating a dictionary with the DictReader takes longer for the read, than a row, as you need to allocate more memory. But admittedly I did not find any benchmarks on that. – Syrius Mar 02 '23 at 09:01
  • 1
    DictReader takes 24 minutes whereas the "normal" reader takes 19 minutes on the same input. I have to benchmark both approaches with the conversion to the data class later on. But I honestly guess that the dictreader will still take longer, as I just access indices in a list, which should be ~constant time in python – Syrius Mar 02 '23 at 09:53
  • 1
    Thank you for the comments. I definitely wanted to show what I thought was the “right way”, following the mantra I was taught, “make it work ➡️ make it work right ➡️ make it work fast”. I’ll try to think if there’s a faster way that’s also clean (elegant, like you said). Have you or your team profiled either solution? – Zach Young Mar 02 '23 at 14:23
  • 1
    Also, in real life how many columns do you have to contend with? And is it a fixed number of columns and just varying order, or do you have to deal with a range of possible input columns? And, how many rows? – Zach Young Mar 02 '23 at 14:25
  • It is semi-fixed. It could be that the number of columns will change in the future (new colums added), but it should be static over a long period of time. In real life we deal with around 400 million rows (altough this will definitely grow) split over multiple files. A filte typically has around 30 million rows – Syrius Mar 02 '23 at 15:31
1

If speed matters most, you'll want to cut out as much dynamism in your row-in-reader loop... at least I think... I haven't timed or profiled this... just going off your own analysis that DictReader was too slow, so...

What's the fastest way I could think of to deal with an arbitrary order of columns? Explicity name the columns you expect then get their row indices based on the header:

def fancy_iter(csv_fname:str) -> Generator[Fancy, None, None]:
    reader = csv.reader(open(csv_fname))
    header = next(reader)

    # Hard-coded list of columns your team knows and maintains
    idx_a = header.index("col_A")
    idx_b = header.index("col_B")
    idx_c = header.index("col_C")

    for row in reader:
        yield Fancy(
            a=row[idx_a],
            b=int(row[idx_b]),
            c=RomanNumeral(row[idx_c]),
        )

Otherwise, like you were, you'll be doing some kind of field-to-column mapping lookup inside the row-in-reader loop, and I see that really dragging down performance on millions of rows. But, again, that's just my intuition/speculation.

Here's my complete test program (and, I've changed your values/semantics around again to try and better illustrate the scope of the problem):

import csv
import io
from dataclasses import dataclass
from enum import Enum
from typing import Generator


class RomanNumeral(Enum):
    I = "i"
    II = "ii"
    III = "iii"
    IV = "iv"
    V = "v"


@dataclass
class Fancy:
    a: str
    b: int
    c: RomanNumeral

    def __repr__(self) -> str:
        a = f"a='{self.a}',"
        b = f"b={self.b},"
        c = f"c={self.c}"
        return f"Fancy( {a:<10} {b:<4} {c:<18} )"


def fancy_iter(csv_file: io.TextIOBase) -> Generator[Fancy, None, None]:
    reader = csv.reader(csv_file, skipinitialspace=True)
    header = next(reader)

    # Hard-coded list of columns your team knows and maintains
    idx_a = header.index("col_A")
    idx_b = header.index("col_B")
    idx_c = header.index("col_C")

    for row in reader:
        if "" in row:
            continue  # the row is "incomplete"; don't try, just move on

        yield Fancy(
            a=row[idx_a],
            b=int(row[idx_b]),
            c=RomanNumeral(row[idx_c]),
        )


def main():
    # Two sets of "files" with different column orders, the last also has
    # invalid rows
    cst_strs = [
        """
col_A,col_B,col_C
one,1,i
four,4,iv
three,3,iii
""",
        """
col_C,col_A,col_B
i,one,1
ii,two,2
iii,three,3
iv,,4
v,five,
,,
,
""",
    ]

    for i, csv_str in enumerate(cst_strs, start=1):
        csv_file = io.StringIO(csv_str.strip())
        csv_file = io.StringIO(csv_str.strip())
        fancies = list(fancy_iter(csv_file))
        print(f"{i}:")
        for fancy in fancies:
            print(f"  {fancy}")


if __name__ == "__main__":
    main()
1:
  Fancy( a='one',   b=1, c=RomanNumeral.I   )
  Fancy( a='four',  b=4, c=RomanNumeral.IV  )
  Fancy( a='three', b=3, c=RomanNumeral.III )
2:
  Fancy( a='one',   b=1, c=RomanNumeral.I   )
  Fancy( a='two',   b=2, c=RomanNumeral.II  )
  Fancy( a='three', b=3, c=RomanNumeral.III )

I also changed out the try/catch for an explicit if/continue, based on my understanding of this answer, Cost of exception handlers: the if-block is faster overall, and definitely faster in the expected case that rows will be invalid.

Your team will have to keep the fieldnames in the dataclass and the idx_ vars in the function in sync, but that's the tradeoff (as I see it) for more speed during runtime. Anyways, you appear to rely on type hints (and maybe a linter?), which will help catch mismatches. Some mismatches will at least result in a runtime error:

  • if iter func falls out of sync with the CSV itself, the column mapping might fail (if the names changed, or the CSV is missing a column)
  • if iter func falls out of sync with the dataclass, the Fancy() init will fail

And, you could just have a current unit test with the complete set of columns.

For fun

I came up with a scheme of having docstrings with column names below the fieldnames that match the column names in the iter func index mapping lines, like:

a: str
"""col_A"""
b: int
"""col_B"""
c: RomanNumeral
"""col_C"""

and came up with a "linter" (my first time trying to use the ast module) that ensures those docstrings match the string literals in the fancy iter func:

# Ensure that the fancy_iter() function "knows" the correct and
# complete mapping of CSV column names to the Fancy dataclass
# fieldnames.

import ast
import sys


MAIN_PY = "main2.py"


def get_dataclass_cols(dataclass_node: ast.ClassDef) -> set[str]:
    """
    Look in the Fancy dataclass for pairs of lines of
    fieldname-line and docstring-line (column name in CSV), like:

        class Fancy:
            a: str
            '''col_A'''
            b: int
            '''col_B'''

    and return a set of CSV column names, e.g., {'col_A', 'col_B'}
    """
    _node = dataclass_node

    cols: set[str] = set()

    # Looking for pairs of AST objects like:
    #   AnnAssign( ... )                  <-- an AnnAssign node
    #   ...                                   followed by...
    #   Expr(                             <-- an Expr node
    #     value=Constant(value='col_A'))  <--   w/a Constant w/a string value (the column name)

    for i in range(len(_node.body)):
        # Verify "lines" 1 & 2 are AnnAssign and Expr
        node1 = _node.body[i]
        if not isinstance(node1, ast.AnnAssign):
            continue
        node2 = _node.body[i + 1]
        if not isinstance(node2, ast.Expr):
            continue
        expr = node2

        # Verify Expr has string Constant
        if not isinstance(expr.value, ast.Constant):
            continue
        const = expr.value
        if not isinstance(const.value, str):
            continue

        cols.add(const.value)

    return cols


def get_iterfunc_cols(func_node: ast.FunctionDef) -> set[str]:
    """
    Look in the CSV iter func for lines assigning column names to indexes,
    beginning with "idx_", like:

        idx_a = header.index("col_A")
        idx_b = header.index("col_B")

    and return a set of CSV column names, e.g., {'col_A', 'col_B'}
    """
    cols: set[str] = set()

    # Looking for AST objects like:
    #   Assign(                                <-- an Assign node
    #     targets=[                            <--   w/a target
    #       Name(id='idx_b', ctx=Store())],    <--     w/a Name that starts with 'idx_'
    #     value=Call(                          <--   and a Call node...
    #       ...
    #       args=[                             <--     w/an arg
    #         Constant(value='col_B') ],       <--       w/a Constant w/a string value (the column name)
    #   )

    for node in func_node.body:
        # Verify Assign with correct Name
        if not isinstance(node, ast.Assign):
            continue
        if len(node.targets) == 0:
            continue
        target = node.targets[0]
        if not isinstance(target, ast.Name):
            continue
        name = target
        if not name.id.startswith("idx_"):
            continue
        if not isinstance(node.value, ast.Call):
            continue

        # Verify Call with correct string Constant
        call = node.value
        if len(call.args) == 0:
            continue
        arg = call.args[0]
        if not isinstance(arg, ast.Constant):
            continue
        const = arg
        if not isinstance(const.value, str):
            continue

        cols.add(const.value)

    return cols


def error(msg: str):
    print("Error, " + msg, file=sys.stderr)
    sys.exit(1)


def main():
    iterfunc_cols: set[str] = set()
    dataclass_cols: set[str] = set()

    main_body = ast.parse(open(MAIN_PY).read()).body
    for node in main_body:
        if isinstance(node, ast.FunctionDef) and node.name == "fancy_iter":
            iterfunc_cols = get_iterfunc_cols(node)

        if isinstance(node, ast.ClassDef) and node.name == "Fancy":
            dataclass_cols = get_dataclass_cols(node)

    if len(dataclass_cols) == 0:
        error("did not find any columns in the dataclass")

    if len(iterfunc_cols) == 0:
        error("did not find any columns in the iter func")

    if iterfunc_cols != dataclass_cols:
        err_msg = "\n".join(
            [
                "columns do not match:",
                "  dataclass_cols: %s" % sorted(dataclass_cols),
                "  iterfunc_cols:  %s" % sorted(iterfunc_cols),
            ]
        )
        error(err_msg)


if __name__ == "__main__":
    main()

So long as your dataclass and iter func are in sync:

a: str             
"""col_A"""        idx_a = header.index("col_A")
b: int             
"""col_B"""        idx_b = header.index("col_B")
c: RomanNumeral    
"""col_C"""        idx_c = header.index("col_C")

the linter is happy. But soon as the linter doesn't find any columns in either the dataclass or the iter func, or it sees the two are out of sync:

a: str             
"""col_A"""        idx_a = header.index("col_a")
b: int             
"""col_B"""        idx_b = header.index("col_B")
c: RomanNumeral    
"""col_C"""        idx_c = header.index("col_C")
Error, columns do not match:
  dataclass_cols: ['col_A', 'col_B', 'col_C']
  iterfunc_cols:  ['col_B', 'col_C', 'col_a']

or

a: str             
"""col_A"""        idx_a = header.index("col_A")
b: int             
"""col_B"""        idx_b = header.index("col_B")
c: RomanNumeral    
"""col_C"""        idx_c = header.index("col_C")
d: float
"""col_D"""
Error, columns do not match:
  dataclass_cols: ['col_A', 'col_B', 'col_C', 'col_D']
  iterfunc_cols:  ['col_A', 'col_B', 'col_C']

Either of those errors would raise exceptions at runtime:

    idx_a = header.index("col_a")
            ^^^^^^^^^^^^^^^^^^^^^
ValueError: 'col_a' is not in list

or:

    yield Fancy(
          ^^^^^^
TypeError: Fancy.__init__() missing 1 required positional argument: 'd'
Zach Young
  • 10,137
  • 4
  • 32
  • 53
  • I did some performance testing of dealing w/invalid rows, [here](https://gist.github.com/zacharysyoung/0205343c49a121f4462aca862c4a615f). I've updated 1) the csv.reader to use `skipinitialspace=True` to reduce all blank fields to `""`; 2 the if-continue block for-row-in-reader loop. – Zach Young Mar 04 '23 at 20:31