1

I've noticed a significant performance degradation with the following script when increasing my cluster size: from a single node to 3 node cluster. Running times are 2min and 6min respectively. Also, noticed CPU activity is very low in both cases.

The task here is a word count over a single text file (10GB, 100M lines, ~2B words). I've made the file available to all nodes prior to launching the script.

What could possibly impede Dask from scaling this out?

from dask.distributed import Client
import dask.dataframe as dd

df = dd.read_csv(file_url, header=None)
# count words
new_df = (
    df[0]
    .str
    .split()
    .explode()
    .value_counts()
)
print(new_df.compute().head())
SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46
w00dy
  • 748
  • 1
  • 6
  • 23
  • 2
    this is a really tough workflow to scale. you're counting unique instances of 2billion unique words. unique value counts are a tough one for distributed processing in general. so if you *can* fit this in memory, it's going to perform much better in a single thread than on a distributed scheduler. usually, unless your operation is heavily CPU-limited, if you are able to make the comparison of in-memory vs. dask, dask is going to lose. dask really shines when you can't do the operation in memory. – Michael Delgado Jun 03 '22 at 21:35
  • 2
    I haven't profiled anything like this, but i'm guessing the vast majority of your runtime is I/O and array resizing. the split.explode() seems particularly painful, and then dask has to do a ton of extra work to syncronize value_counts across partitions. all of these are way easier in a single thread. you'll get some speedup just parsing the csv in parallel, but it seems like this doesn't compensate for the big increase in overhead, and to me this seems reasonable. have you seen this perfom well on a different distributed processing engine? – Michael Delgado Jun 03 '22 at 21:43

1 Answers1

1

One potential problem is the communication that arises when you have many workers sending value counts on a high cardinality variable. The core issue is somewhat similar to the one discussed here: even if memory is not a bottleneck for the workers, they still will end up passing around (potentially) large counters, which can be very slow due to (de)serialization and transmission over the network.

To test if this is the issue, you can try creating a low cardinality file, e.g. using terminal yes | head -1000000 > test_low_cardinality.csv, and testing your snippet on this file.

SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46