0

I'm processing a bunch of text-based records in csv format using Dask, which I am learning to use to work around too large to fit in memory problems, and I'm trying to filter records within groups that best match a complicated criteria.

The best approach I've identified to approach this so far is to basically use Dash to group records in bite sized chunks and then write the applicable logic in Python:

def reduce_frame(partition):
    records = partition.to_dict('record')
    shortlisted_records = []

    # Use Python to locate promising looking records.
    # Some of the criteria can be cythonized; one criteria
    # revolves around whether record is a parent or child
    # of records in shortlisted_records.
    for other in shortlisted_records:
        if other.path.startswith(record.path) \
            or record.path.startswith(other.path):
            ... # keep one, possibly both
        ...

    return pd.DataFrame.from_dict(shortlisted_records)

df = df.groupby('key').apply(reduce_frame, meta={...})

In case it matters, the complicated criteria revolves around weeding out promising looking links on a web page based on link url, link text, and css selectors across the entire group. Think with given A, and B in shortlist, and C a new record, keep all if each are very very promising, else prefer C over A and/or B if more promising than either or both, else drop C. The resulting Pandas partition objects above are tiny. (The dataset in its entirety is not, hence my using Dask.)

Seeing how Pandas functionality exposes inherently row- and column-based functionality, I'm struggling to imagine any vectorized approach to solving this, so I'm exploring writing the logic in Python.

Is the above the correct way to proceed, or are there more Dask/Pandas idiomatic ways - or simply better ways - to approach this type of problem? Ideally one that allows to parallelize the computations across a cluster? For instance by using Dask.bag or Dask.delayed and/or cytoolz or something else I might have missed while learning Python?

Denis de Bernardy
  • 75,850
  • 13
  • 131
  • 154
  • Kind of hard to know whether vectorization is possible or not without knowing the logic. But you might want to [check this out](https://stackoverflow.com/questions/26187759/parallelize-apply-after-pandas-groupby) – rafaelc Jul 24 '19 at 16:38
  • @rafaelc: nah, trust me, vectorization isn't much of an option insofar as I've been studying the docs and SO for the past weeks. Think with given A, and B in shortlist, and C a new record, keep all if each are very very promising, else prefer C over A and/or B if more promising than either or both, else drop C. Re the question you linked to, the parallelization I need is before the groupby, hence my using Dask. The partitions in the apply are a few hundred records max and comfortably fit in memory. – Denis de Bernardy Jul 24 '19 at 16:41
  • Please take a look at [How to create good pandas examples](https://stackoverflow.com/questions/20109391/how-to-make-good-reproducible-pandas-examples) and provide a [mcve] for your problem including sample input and output so that we can better understand and help you – G. Anderson Jul 24 '19 at 16:43
  • @G.Anderson: I'm not looking for code, I'm looking for an Dask/Pandas expert who can confirm whether the approach is correct or if there's a better way. – Denis de Bernardy Jul 24 '19 at 16:44
  • Again it's really difficult to say, **especially** with the groupby. While many things can be done with `groupby.apply` there are a lot of intelligent and sneaky ways to first create the condition then use one of the `groupby` functions that have a cythonized implementation. The former will be a slow loop, the later orders of magnitudes faster. – ALollz Jul 24 '19 at 16:46
  • @ALollz: I've edited the code in the question slightly, to highlight why I do not think it can be cythonized. – Denis de Bernardy Jul 24 '19 at 17:39

1 Answers1

1

I know nothing about Dask, but can tell a little on passing / blocking some rows using Pandas.

It is possible to use groupby(...).apply(...) to "filter" the source DataFrame.

Example: df.groupby('key').apply(lambda grp: grp.head(2)) returns first 2 rows from each group.

In your case, write a function to applied to each group, which:

  • contains some logic, processing the current group,
  • generates the output DataFrame, based on this logic, e.g. returning only some of input rows.

The returned rows are then concatenated, forming the result of apply.

Another possibility is to use groupby(...).filter(...), but in this case the underlying function returns a decision "passing" or "blocking" each group of rows.

Yet another possibility is to define a "filtering function", say filtFun, which returns True (pass the row) or False (block the row).

Then:

  • Run: msk = df.apply(filtFun, axis=1) to generate a mask (which rows passed the filter).
  • In further processing use df[msk], i.e. only these rows which passed the filter.

But in this case the underlying function has acces only to the current row, not to the whole group of rows.

Valdi_Bo
  • 30,023
  • 4
  • 23
  • 41
  • So, if I get this answer correctly, my hunch from the question is correct and corresponds to the 1st method you outline, and that would be the idiomatic way of doing this using something Pandas-like; and the 2nd and 3rd methods can't possibly apply because both are strictly row-based. – Denis de Bernardy Jul 24 '19 at 19:30
  • You are right. The second method was "suspicious" as the decision is made on **the whole group**. The same can be said on the third method, as the "decision function" does not see other rows. – Valdi_Bo Jul 25 '19 at 03:20