5

I have a medium sized file (~300MB) containing a list of individuals (~300k) and actions they performed. I'm trying to apply an operation for each individuals using groupBy and the paralellized version of apply described here. It looks something like this

import pandas
import multiprocessing
from joblib import Parallel, delayed

df = pandas.read_csv(src)
patients_table_raw = apply_parallel(df.groupby('ID'), f)

def applyParallel(dfGrouped, func):
    retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
    return pd.concat(retLst)

But unfortunately this consumes A HELL LOT OF SPACE. I think it is related with the fact that the simple command:

list_groups = list(df.groupby('ID'))

Consumes several GB of memory! How to procceed? My initial thoughts were to iterate the groupBy in small 'stacks', not consuming too much memory (but I didn't found a way to do so without casting it to a list).

More detailed context

I have a simple CSV dataset in the following fashion:

|-------------------------|
| ID | Timestamp | Action |
|-------------------------|
|1   | 0         | A      |
|1   | 10        | B      |
|1   | 20        | C      |
|2   | 0         | B      |
|2   | 15        | C      |
         ...

What I'm basically trying to do is create a different table that contains a description of sequence of actions/timestamps of the individuals and their IDs. This will help me retrieve the individuals

|------------------|
| ID | Description |
|------------------|
|1   | 0A10B20C    |
|2   | 0B15C       |
         ...

In order to do so, and to follow a Pythonic way, my idea was basically to load the first table in a pandas DataFrame, groupBy the ID, and apply a function in the grouping that returns a row of the table I want for each group (each ID). However, I have LOTS of individuals in my dataset (around 1 million), and the groupBy operation was extremely expensive (without explicit garbage collection, as I mentioned in my own answer). Also, parallelizing the groupBy implied in significant memory use, because apparently some things get duplicated.

Therefore, the more detailed question is: how to use groupBy (and therefore make the data processing faster than if you would implement a loop of your own) and don't get this huge memory overhead?

Community
  • 1
  • 1
Manoel Ribeiro
  • 374
  • 1
  • 12
  • There is an alternative! use Apache Spark. :D – Alberto Bonsanto Sep 02 '16 at 01:15
  • Hahaha, is supposed to be a one off thing, so I didn't want to get fancy! But reasonable approach! Thanks – Manoel Ribeiro Sep 02 '16 at 01:16
  • 3
    Do you actually need parallelization on a file that sized? A lot easier to just work in serial. If you do need it, look at `dask`. – chrisb Sep 02 '16 at 01:23
  • `dask` is exactly designed to address your issue by keeping exactly the way you work with your dataframes. No additional Java Spark fanciness to add. – Zeugma Sep 02 '16 at 02:58
  • 1
    I agree with @chrisb - I don't think you need to parallelize your computations - very often it will be slower when using Pandas. Can you post a question with a sample data set, short description of what you are going to achieve and desired data set - in this case the SO community might be able to find an optimal solution – MaxU - stand with Ukraine Sep 02 '16 at 03:37
  • @MaxU tried to explain a bit more! Hopefully there exists a more elegant solution than mine. – Manoel Ribeiro Sep 02 '16 at 04:05
  • 1
    @ManoelRibeiro, i've added an answer - could you check it with your real data? – MaxU - stand with Ukraine Sep 02 '16 at 04:15

2 Answers2

2

Some comments and then the solution I've found:

  • I've tried dask and it didn't made much difference. I guess it is because the file is not big enough to use the secondary memory.

  • The memory performance improves significantly if you perform the garbage collection inside the function you apply to the groups. I've managed to do so with a simple gc.collect() that happens every $10000$ interactions. Something like:

    x['ID'].head(1).values[0] % 10000 == 0:
        gc.collect()
    
  • The garbage collection actually made my parallel version run. But the return pd.concat(retLst) was another huge bottleneck, and consumed tons of memory!

My final solution was to paralellize the solution in an outer fashion:

  • I created a function that will perform the groupBy and the apply for individuals with ID's inside a range [X,Y]

  • I simply create a pool and run those in parallel. Each process saves a file with a different name, depending on its range

    f = functools.partial(make_patient_tables2, src="in", dest="out")
    range_of = [(0, 10000), (10000, 20000), (20000, 30000)]
    with Pool(cpu_count()) as p:
        ret_list = p.map(f, range_of)
    
  • Last but not least, I concatenate all the generated files.

Notice that this is still a bit memory intensive, as we have to replicate the reading of the table (which is done inside make_patient_tables2, but would happen anyway, as the multiprocessing doesn't share resources. A better solution, therefore, would envolve sharing resources, but the garbage collector + not using the concat + replication the original data only 2-3 times was enough for me!

Certainly not pretty. Hope it can be of help for someone else.

Manoel Ribeiro
  • 374
  • 1
  • 12
2

Try this (without parallelization):

In [87]: df
Out[87]:
   ID  Timestamp Action
0   1          0      A
1   1         10      B
2   1         20      C
3   2          0      B
4   2         15      C

In [88]: df.set_index('ID').astype(str).sum(axis=1).groupby(level=0).sum().to_frame('Description').reset_index()
Out[88]:
   ID Description
0   1    0A10B20C
1   2       0B15C
MaxU - stand with Ukraine
  • 205,989
  • 36
  • 386
  • 419
  • Hey! I've tried it, and is efficient. Curiously the garbage collection problem doesn't happen in your approach. However this approach is not totally generalizable (actually in the true dataset you have a bit more complicated description and tables), and therefore you kinda "need" the apply, as the mere sum won't solve it. I wonder if by applying the generic function you create all the problems I've had. – Manoel Ribeiro Sep 02 '16 at 04:32
  • 1
    @ManoelRibeiro, `.apply()` in general is pretty inefficient, so we want to avoid it if possible – MaxU - stand with Ukraine Sep 02 '16 at 04:36