0

I am confused as to why my dask program is not producing any output, it simply hangs after submitting. I have specified to use processes instead of threads and can see all of the cores fire up upon submitting (as suggested here:dask computation not executing in parallel) so it seems to compute but never finishes. I am just trying to run a simple regex over a list of long text files. Am I missing something obvious?

import re
from os import listdir

import dask.bag as db
import dask.multiprocessing
dask.set_options(get=dask.multiprocessing.get)


loc = 'D:\\...\\text_files\\'
txts = [loc + i for i in listdir(loc)[:10]]

#  Load data in parallel
f = db.from_filenames(txts)
f = f.repartition(3)

# Define the regex
regex = re.compile(r'\b[A-Z][a-z]+\b')

# create function to parallelize
def reg(text):
    return regex.findall(text)

# distribute the function over cores
output = f.map(reg).compute().concat()
Community
  • 1
  • 1
Adam P.
  • 89
  • 5
  • Why the call to repartition? – MRocklin Apr 13 '16 at 23:57
  • Please enlighten me … which part of the posted source is supposed to generate output? You are not just simply missing a `print output` after the last line? – Alfe Apr 14 '16 at 00:06
  • PARTITION: the default partition method is 100. i just wanted to utilize 3 threads so I assumed that was how you could align threads with partitions. Does it make sense to create more partitions that threads? Perhaps a hack at load balancing? PRINT: nope i tried to print but the program never completes once calling the .compute() method. – Adam P. Apr 14 '16 at 02:05

1 Answers1

0

Two suggestions:

  1. Drop the call to repartition. This will instantiate the data and then try to move it between processes, which is generally expensive. You should trust the defaults provided by the system. It will only use as many processes as you have cores.

  2. Call .compute() only at the very end of your computation. You probably want to swap the following two lines:

    output = f.map(reg).compute().concat()  # before
    output = f.map(reg).concat().compute()  # after
    
MRocklin
  • 55,641
  • 23
  • 163
  • 235