Pytorch Datapipes are a new inplace dataset loaders for large data that can be fed into Pytorch models through streaming, for reference these are
- Official Doc: https://pytorch.org/data/main/tutorial.html
- A crash-course post explaining the usage https://sebastianraschka.com/blog/2022/datapipes.html
Given a myfile.csv
file, initialised as csv_file
variable in code, the file looks like this, :
imagefile,label
train/0/16585.png,0
train/0/56789.png,0
...
In the example code, that uses datapipes that reads a csv_file
and then create a iterable dataset using torchdata.datapipes
and we see something like:
from torchdata import datapipes as dp
def build_data_pipe(csv_file, transform, len=1000, batch_size=32):
new_dp = dp.iter.FileOpener([csv_file])
new_dp = new_dp.parse_csv(skip_lines=1)
# returns tuples like ('train/0/16585.png', '0')
new_dp = new_dp.shuffle(buffer_size=len)
...
# More code that returns `new_dp` variable that looks like some
# lazy-loaded unevaluated/materialized Iterable objects.
return new_dp
If we look at each step and the return to new_dp
, we see:
>>> from torchdata import datapipes as dp
# The first initialize a FileOpenerIterDataPipe type
>>> new_dp = dp.iter.FileOpener(["myfile.csv"])
>>> new_dp
FileOpenerIterDataPipe
# Then after that the API to the DataPipes allows some overwriting/subclassing
# by calling a partial function, e.g.
>>> new_dp.parse_csv
functools.partial(<function IterDataPipe.register_datapipe_as_function.<locals>.class_function at 0x213123>, <class 'torchdata.datapipes.iter.util.plain_text_reader.CSVParserIterDataPipe'>, False, FileOpenerIterDataPipe)
>>> new_dp = new_dp.parse_csv(skip_lines=1)
>>> new_dp
CSVParserIterDataPipe
It looks like the new_dp.parse_csv(skip_lines=1)
is trying do a a new initialization through a MixIn between CSVParserIterDataPipe
and FileOpenerIterDataPipe
but I'm not exactly sure what's happening.
To fully get a working datapipe, there's a whole bunch of other new_dp = new_dp.xxx()
to call. And my question are,
Q1. Can't the DataPipe be initialize in a non-sequetial way? (P/S: This didn't work as expected)
from torchdata imnport datapipes as dp
class MyDataPipe(dp.iterGenericDataPipe):
def __init__(self, csv_file, skip_lines=1, shuffle_buffer=1000):
super().__init__([csv_file])
self.parse_csv(skip_lines=1)
self.new_dp.shuffle(buffer_size=shuffle_buffer)
But given that we have to overwrite the new_dp
, seems like we might have to do something like:
from torchdata imnport datapipes as dp
class MyDataPipe(dp.iterGenericDataPipe):
def __init__(self, csv_file, skip_lines=1, shuffle_buffer=1000):
super().__init__([csv_file])
self = self.parse_csv(skip_lines=1)
self = self.new_dp.shuffle(buffer_size=shuffle_buffer)