2

I have a large dataset of daily files located at /some/data/{YYYYMMDD}.parquet (or can also be smth like /some/data/{YYYY}/{MM}/{YYYYMMDD}.parquet).

I describe data source in mycat.yaml file as follows:

sources:
  source_paritioned:
    args:
      engine: pyarrow
      urlpath: "/some/data/*.parquet"
    description: ''
    driver: intake_parquet.source.ParquetSource

I want to be able to read a subset of files (partitions) into memory,

If I run source = intake.open_catalog('mycat.yaml').source_partitioned; print(source.npartitions) I get 0. Probably because the partition information is not yet initialized. After source.discover(), source.npartitions is updated to 1726 which is exactly the number of individual files on disk.

How would I load data:

  • only for a given day (e.g. 20180101)
  • for a period between to days (e.g. between 20170601 and 20190223) ?

If this is described somewhere on the wiki, feel free to point me to the appropriate section.

Note: after thinking a little more, I realized this might be related to functionality of dask and probably my goal can be somehow achieved by converting the source to dask_dataframe with .to_dask method. Therefore putting dask label on this question.

Mikhail Shevelev
  • 408
  • 5
  • 12

2 Answers2

2

There are at least two approaches:

  1. continue with the current approach of loading everything into dask (using *) and then subset to the required range.

  2. load only a specific subset of the data.

For option 2, the parameters option of intake is handy. So, assuming that paths are /some/data/{YYYYMMDD}.parquet, the modified catalog entry would look like this:

sources:
  source_partitioned:
    parameter:
      date:
        type: str
        default: "*"
    args:
      engine: pyarrow
      urlpath: "/some/data/{{ date }}.parquet"
    description: ''
    driver: intake_parquet.source.ParquetSource

In Python, the parameter date can be provided (as 'str' in this case) using source = intake.open_catalog('mycat.yaml').source_partitioned(date='20211101') to load a specific date.

For date ranges, things are a bit trickier, because one way would be to create some list comprehension using desired range and then concatenate the files loaded individually, but that might be not efficient for large date ranges. In those cases I would load bigger chunks, e.g. by year using date="2017*", and concatenate these larger chunks afterwards.

SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46
  • thank you very much for the answer. Can you elaborate on the first approach with dask? Let's say I do `ddf = source.to_dask()`, can I somehow index this dataframe based on date range without reading every file? I think it is not easy as the date information is not available in the index/partitioning information of ddf? Even if I change definition to have `urlpath: "/some/data/{date}.parquet`", date column is added to ddf ([https://www.anaconda.com/blog/intake-parsing-data-from-filenames-and-paths]), but I cannot filter based on it until I read all data, right? – Mikhail Shevelev Nov 21 '21 at 05:11
  • Glad it helped! No, AFAIK, using the first approach it's not possible to do what you describe without reading the files (at least the metadata). It's not too bad if the parquet files are indexed by datetime (and are not overlapping across files), because then dask will know from the metadata which partitions have which date ranges (so doing something like `.loc` will filter some of the files without loading them into memory). – SultanOrazbayev Nov 21 '21 at 05:37
  • And if the underlying parquet files are not indexed by datetime, then it might be efficient to go the second route: creating a function that generates a list of specific dates, then uses `intake` to load them into dask and concatenate them into a single dataframe (if necessary). – SultanOrazbayev Nov 21 '21 at 05:46
  • Let's say date is in the filenames. I still cannot figure out how to complete the approach with dask, but I really want to understand. Do you think you can provide an example? Please make all the assumptions you need, I just need some working example of the first approach that would avoid reading every single file. – Mikhail Shevelev Nov 21 '21 at 05:57
1

This is a follow-up on a comment to my previous answer.

If the parquet files are indexed by (nonoverlapping) time, then dask will not need to read every file into memory (dask will read only the metadata of each file). The metadata for all files will be loaded, but only the relevant files will be loaded in memory:

from dask.datasets import timeseries

# this df will have 30 partitions
df = timeseries()

# this query will only work with 1 partition
df.loc["2000-01-03"]

This can be useful if the downstream workflow operates with different subsets of a large dataframe, but which subsets are needed is changed dynamically. So the fixed cost of creating a large dask dataframe (using metadata only) is incurred once, and then dask is responsible for selecting subsets of the data needed.

If the parquet files are not indexed by time and the time information is only in the filename, then dask will not parse the information from the filename. In this case, some of the options are:

  • writing a custom-loader function that will filter the required filenames and pass them to dask. This can reduce the fixed cost of creating the dask dataframe and is useful when it is known which subset of overall data is needed;

  • using intake as per previous answer.

SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46