1

I have a function that creates a data.table with around 29 million rows and a user defined number of columns based on an input sample list. It reads individual sample files with an index column and joins them column-wise to a master index column to create this large data.table.

index sample1 sample2
1 1 2
2 3 4
... ... ...
29m 5 6

Since this takes up a lot of memory when done all at once, I'd like to read in a a few files at one time, join them to the index, write them to disk as parquet files, and clear them from memory. From the documentation, I cannot tell if this is possible or whether partitioning files is only possible when the full dataset is present and only with values in the columns and not the column names themselves. Doing something like


arrow::write_dataset(DT, partitioning = c("sample1", "sample2", ...)

gives a directory with as many sub-directories as there are values in each of those columns.

I have tried an approach that writes parquets to a directory, but reading them back in is proving hard since I can't figure out how to read multiple parquet files and join them columnwise. I've asked another question here about that: How to write multiple arrow/parquet files in chunks while reading in large data quantities of data so that all written files are one dataset?

If I have 10 sample files and want to chunk them in groups of 5, I'm expecting to make a directory of 2 parquet files, each with the index column, and 5 sample groups.

part-1.parquet: index, sample1, sample2, sample3, sample4, sample5
part-2.parquet: index, sample6, sample7, sample8, sample9, sample10

I can also have three files - one index parquet, one parquet with the first 5 samples, and another with the last 5 samples - if column duplication is not good practice.

index.parquet: index
part-1.parquet: sample1, sample2, sample3, sample4, sample5
part-2.parquet: sample6, sample7, sample8, sample9, sample10
  • 1
    Is there a reason not to reshape from [wide to long form](https://stackoverflow.com/questions/2185252/reshaping-data-frame-from-wide-to-long-format)? – alistaire May 12 '23 at 17:48
  • I tried it in long form and partitioning works fine by sample. The individual files are less than 20M which, according to the docs, is not the best way to partition and I could expect to see as many as a 1000 samples. The data also needs to go to downstream analysis packages that expect a wide form data matrix and I thought it would be easier if I could write them to disk that way. – basesorbytes May 12 '23 at 18:25

1 Answers1

2

If I understand correctly:

  • the "normal" way that arrow does partitioning is by groups of rows, the assumption is that they all share the same columns; whereas
  • you want to read multiple parquet files and have them additive as columns instead.

I think the best way to go is to do a lazy read (with or without filtering) and Reduce with left_join on whichever column is an id field (must be present in all).

(Side note: I don't think this can be done by blindly using cbind or bind_cols, but even if we could it would be a little unsafe if any file is updated in the absence of the others.)

Reproducible example.

dir.create("/tmp/pqdir")
mt <- as.data.table(mtcars)
mt[, id := .I]
setcolorder(mt, "id")
for (nm in names(mt)[-1]) arrow::write_parquet(mt[, .SD, .SDcols=c("id",nm)], file.path("/tmp/pqdir/", paste0(nm, ".pq")))

At this point, we have a set of parquet files where each of them has id and one other column. The fact that it is always "one other column" is completely coincidence, this method works just as well if one parquet file has 100 and the others have differing numbers. The only true necessity is that they share a common id to merge on.

Two such files, for clarity:

library(dplyr)
arrow::open_dataset("/tmp/pqdir/am.pq") %>%
  head(3) %>%
  collect()
#       id    am
#    <int> <num>
# 1:     1     1
# 2:     2     1
# 3:     3     1
arrow::open_dataset("/tmp/pqdir/cyl.pq") %>%
  head(3) %>%
  collect()
#       id   cyl
#    <int> <num>
# 1:     1     6
# 2:     2     6
# 3:     3     4

I should say that I'm using dplyr to demonstrate this. While I recognize you said you are using data.table, the addition of dplyr allows us to do "lazy" pulls, only bringing in data at the end.

I suggest that we lazily open each of the parquet files and lazily join them together. I'll use a combination of dplyr::full_join within a base::Reduce (I'm guessing purrr::reduce would work also).

combined <- lapply(list.files("/tmp/pqdir", full.names=TRUE)[c(1,3,5,6)], 
                   arrow::open_dataset) |>
  Reduce(function(prev, this) full_join(prev, this, by = "id"), x = _)
combined
# FileSystemDataset (query)
# id: int32 (coalesce(id.x, id.y))
# am: double
# cyl: double
# drat: double
# gear: double
# See $.data for the source Arrow object

I didn't collect this yet just to demonstrate that while we have accessed and started the join process on the four (arbitrarily-chosen) files, we have not yet pulled the data, preserving some wiggle room since you're dealing with millions of rows.

From here, we can collect:

combined %>%
  collect()
#        id    am   cyl  drat  gear
#     <int> <num> <num> <num> <num>
#  1:     1     1     6  3.90     4
#  2:     2     1     6  3.90     4
#  3:     3     1     4  3.85     4
#  4:     4     0     6  3.08     3
#  5:     5     0     8  3.15     3
#  6:     6     0     6  2.76     3
#  7:     7     0     8  3.21     3
#  8:     8     0     4  3.69     4
#  9:     9     0     4  3.92     4
# 10:    10     0     6  3.92     4
# ---                              
# 23:    23     0     8  3.15     3
# 24:    24     0     8  3.73     3
# 25:    25     0     8  3.08     3
# 26:    26     1     4  4.08     4
# 27:    27     1     4  4.43     5
# 28:    28     1     4  3.77     5
# 29:    29     1     8  4.22     5
# 30:    30     1     6  3.62     5
# 31:    31     1     8  3.54     5
# 32:    32     1     4  4.11     4

The biggest drawback to this is that each time you load the data, you'll be doing multiple joins; this may or may not be a concern for you.

Some ways you can update this process:

  • filter each parquet file individually or all using the same filtering (perhaps on id?); and
  • use one of the other *_join verbs, in case your filtering is unique to whichever columns are present in each of the files you open.

I loaded just four, you have the option of loading as few or as many as you need.

I do suggest that the columns you're going to use most of the time should likely be in one parquet file, and joining other columns as "options". Over to you for contextual memory-management :-)

r2evans
  • 141,215
  • 6
  • 77
  • 149