8

I am looking to parallelise numpy or pandas operations. For this I have been looking into pydata's blaze. My understanding was that seemless parallelisation was its major selling point.

Unfortunately I have been unable to find an operation that runs on more than one core. Is parallel processing in blaze available yet or currently only a stated aim? Am I doing something wrong? I am using blaze v0.6.5.

Example of one function I was hoping to parallelise: (deduplication of a pytables column too large to fit in memory)

import pandas as pd
import blaze as bz
def f1():
    counter = 0
    groups = pd.DataFrame(columns=['name'])
    t = bz.TableSymbol('t', '{name: string}')
    e = bz.distinct(t)
    for chunk in store.select('my_names', columns=['name'],
                              chunksize=1e5):
        counter += 1
        print('processing chunk %d' % counter)
        groups = pd.concat([groups, chunk])
        groups = bz.compute(e, groups)

Edit 1

I have had problems following Phillip's examples:

In [1]: from blaze import Data, compute

In [2]: d = Data('test.bcolz')

In [3]: d.head(5)
Out[3]: <repr(<blaze.expr.collections.Head at 0x7b5e300>) failed: NotImplementedError: Don't know how to compute:
expr: _1.head(5).head(11)
data: {_1: ctable((8769257,), [('index', '<i8'), ('date', 'S10'), ('accessDate', 'S26')])
  nbytes: 367.97 MB; cbytes: 35.65 MB; ratio: 10.32
  cparams := cparams(clevel=5, shuffle=True, cname='blosclz')
  rootdir := 'test.bcolz'
[(0L, '2014-12-12', '2014-12-14T17:39:19.716000')
 (1L, '2014-12-11', '2014-12-14T17:39:19.716000')
 (2L, '2014-12-10', '2014-12-14T17:39:19.716000') ...,
 (1767L, '2009-11-11', '2014-12-15T13:32:39.906000')
 (1768L, '2009-11-10', '2014-12-15T13:32:39.906000')
 (1769L, '2009-11-09', '2014-12-15T13:32:39.906000')]}>

My environment:

C:\Anaconda>conda list blaze
# packages in environment at C:\Anaconda:
#
blaze                     0.6.8               np19py27_69

But note, blaze seems to report a wrong version:

In [5]: import blaze

In [6]: blaze.__version__
Out[6]: '0.6.7'

With other data sources blaze seems to work:

In [6]: d = Data([1,2,2,2,3,4,4,4,5,6])

In [7]: d.head(5)
Out[7]:
   _2
0   1
1   2
2   2
3   2
4   3

In [16]: list(compute(d._2.distinct()))
Out[16]: [1, 2, 3, 4, 5, 6]
ARF
  • 7,420
  • 8
  • 45
  • 72

1 Answers1

8

Note: The example below requires the latest version of blaze, which you can get via

conda install -c blaze blaze

You'll also need the latest version of the nascent into project. You'll need to install into from master, which you can do with

pip install git+git://github.com/ContinuumIO/into.git

You can't do "seamless" parallelization with an arbitrary backend, but the bcolz backend supports parallelization in a nice way. Here's an example with the NYC Taxi trip/fare dataset

Note: I've combined both the trip and fare datasets into a single dataset. There are 173,179,759 rows in the dataset

In [28]: from blaze import Data, compute

In [29]: ls -d *.bcolz
all.bcolz/  fare.bcolz/ trip.bcolz/

In [30]: d = Data('all.bcolz')

In [31]: d.head(5)
Out[31]:
                          medallion                      hack_license  \
0  89D227B655E5C82AECF13C3F540D4CF4  BA96DE419E711691B9445D6A6307C170
1  0BD7C8F5BA12B88E0B67BED28BEA73D8  9FD8F69F0804BDB5549F40E9DA1BE472
2  0BD7C8F5BA12B88E0B67BED28BEA73D8  9FD8F69F0804BDB5549F40E9DA1BE472
3  DFD2202EE08F7A8DC9A57B02ACB81FE2  51EE87E3205C985EF8431D850C786310
4  DFD2202EE08F7A8DC9A57B02ACB81FE2  51EE87E3205C985EF8431D850C786310

  vendor_id  rate_code store_and_fwd_flag     pickup_datetime  \
0       CMT          1                  N 2013-01-01 15:11:48
1       CMT          1                  N 2013-01-06 00:18:35
2       CMT          1                  N 2013-01-05 18:49:41
3       CMT          1                  N 2013-01-07 23:54:15
4       CMT          1                  N 2013-01-07 23:25:03

     dropoff_datetime  passenger_count  trip_time_in_secs  trip_distance  \
0 2013-01-01 15:18:10                4                382            1.0
1 2013-01-06 00:22:54                1                259            1.5
2 2013-01-05 18:54:23                1                282            1.1
3 2013-01-07 23:58:20                2                244            0.7
4 2013-01-07 23:34:24                1                560            2.1

     ...     pickup_latitude  dropoff_longitude  dropoff_latitude  \
0    ...           40.757977         -73.989838         40.751171
1    ...           40.731781         -73.994499         40.750660
2    ...           40.737770         -74.009834         40.726002
3    ...           40.759945         -73.984734         40.759388
4    ...           40.748528         -74.002586         40.747868

   tolls_amount  tip_amount  total_amount  mta_tax  fare_amount  payment_type  \
0             0           0           7.0      0.5          6.5           CSH
1             0           0           7.0      0.5          6.0           CSH
2             0           0           7.0      0.5          5.5           CSH
3             0           0           6.0      0.5          5.0           CSH
4             0           0          10.5      0.5          9.5           CSH

  surcharge
0       0.0
1       0.5
2       1.0
3       0.5
4       0.5

[5 rows x 21 columns]

To add process-based parallelism, we bring in the Pool class from the multiprocessing stdlib module, and pass the Pool instance's map method as a keyword argument to compute:

In [32]: from multiprocessing import Pool

In [33]: p = Pool()

In [34]: %timeit -n 1 -r 1 values = compute(trip.medallion.distinct())
1 loops, best of 1: 1min per loop

In [35]: %timeit -n 1 -r 1 values = compute(trip.medallion.distinct(), map=p.map)
1 loops, best of 1: 16.2 s per loop

So, roughly a 3x speedup for an extra line of code. Note that this is a string column, and these tend to be very inefficient compared to other types. An distinct expression computed over an integer column is finished in about 1 second (vs 3 seconds) with multiple cores (so, about the same improvement in running time):

In [38]: %timeit -n 1 -r 1 values = compute(trip.passenger_count.distinct())
1 loops, best of 1: 3.33 s per loop

In [39]: %timeit -n 1 -r 1 values = compute(trip.passenger_count.distinct(), map=p.map)
1 loops, best of 1: 1.01 s per loop
Phillip Cloud
  • 24,919
  • 11
  • 68
  • 88
  • Thanks Phillip. That really helped! I was searching the various parts of the docs and Continuum blogs but found nothing concrete on parallel processing. Do you by any chance have an overview over which backends support parallel processing and which do not? If not, I am going to take a look at the code. Thanks in any case! – ARF Dec 16 '14 at 14:56
  • At current bcolz, h5py (for more nd-array-like operations) and collections of CSV files can all benefit from a pmap function. Of course, if Pandas released the GIL we could accelerate anything that uses DataFrames (elbows Phil and Jeff) :) – MRocklin Dec 16 '14 at 15:56
  • @MRocklin Great! bcolz, h5py and CSV is plenty for me... Unfortunately I keep getting `NotImplementedError: Don't know how to compute` (at least with bcolz). See Edit 1 in OP. May I impose on you both once more to point me in the right direction? I did not use `into` but created the bcolz natively. Any chance that is the reason? – ARF Dec 16 '14 at 18:08
  • Doing things natively is ideal. Into is just for convenience. I suggest updating from source (instructions in comment to the original question) Does the problem persist after that? – MRocklin Dec 16 '14 at 20:27