just been playing with this and you might just need to repartition
your dataframe to allow the computation to be run in parallel
I start by generating some data:
import csv
import random
with open('tmp.csv', 'w') as fd:
out = csv.writer(fd)
out.writerow(['id', 'number'])
for i in range(500_000):
a = random.randrange(1000, 2999)
b = random.randrange(100_000, 899_999)
out.writerow([i+1, f'+44 {a} {b}'])
note that these are mostly valid UK numbers.
I then run something similar to your code:
from dask.distributed import Client
import dask.dataframe as dd
import phonenumbers
def fn(num):
return phonenumbers.is_valid_number(phonenumbers.parse(num))
with Client(processes=True):
df = dd.read_csv('tmp.csv')
# repartition to increase parallelism
df = df.repartition(npartitions=8)
df['valid'] = df.number.apply(fn, meta=('valid', 'object'))
out = df.compute()
this takes ~20 seconds to complete on my laptop (4 cores, 8 threads, Linux 5.2.8), which is only a bit more than double the performance of the plain loop. which indicates dask has quite a bit of runtime overhead as I'd expect it to be much faster than that. if I remove the call to repartition
it takes a longer than I'm willing to wait and top
only shows a single process running
note that if I rewrite it to do the naive thing in multiprocessing
I get much better results:
from multiprocessing import Pool
import pandas as pd
df = pd.read_csv('tmp.csv')
with Pool(4) as pool:
df['valid'] = pool.map(fn, df['number'])
which reduces runtime to ~11 seconds and is even less code here as a bonus