2

Is it possible in python (maybe using dask, maybe using multiprocessing) to 'emplace' generators on cores, and then, in parallel, step through the generators and process the results?

It needs to be generators in particular (or objects with __iter__); lists of all the yielded elements the generators yield won't fit into memory.

In particular:

With pandas, I can call read_csv(...iterator=True), which gives me an iterator (TextFileReader) - I can for in it or explicitly call next multiple times. The entire csv never gets read into memory. Nice.

Every time I read a next chunk from the iterator, I also perform some expensive computation on it.

But now I have 2 such files. I would like to create 2 such generators, and 'emplace' 1 on one core and 1 on another, such that I can:

 result = expensive_process(next(iterator))

on each core, in parallel, and then combine and return the result. Repeat this step until one generator or both is out of yield.

It looks like the TextFileReader is not pickleable, nor is a generator. I can't find out how to do this in dask or multiprocessing. Is there a pattern for this?

Colin
  • 3,670
  • 1
  • 25
  • 36
  • Can you be more specific - do you have two CSV files (of the same format maybe - or at least compatible columns or some such) - that you'd like to apply some aggregation function to? What is the nature of `expensive_process` for instance? – Jon Clements Nov 16 '18 at 00:54
  • Well, to tell the truth I don't have 2 (yes, identically formatted csvs) - I actually have 300+. And 'expensive_process' means just that - in this case, turn one of the csv columns from string to arrays of numbers, and then render those numbers. So, some expensive function applied to each row of the csv. I'm trying to avoid concatenating many slices, and then sending off the result for parallel processing - that'll be a lot of data transfer... – Colin Nov 16 '18 at 03:20

2 Answers2

1

Dask's read_csv is designed to load data from multiple files in chunks, with a chunk-size that you can specify. When you operate on the resultant dataframe, you will be working chunk-wise, which is exactly the point of using Dask in the first place. There should be no need to use your iterator method.

The dask dataframe method you will want to use, most likely, is map_partitions().

If you really wanted to use the iterator idea, you should look into dask.delayed, which is able to parallelise arbitrary python functions, by sending each invocation of the function (with a different file-name for each) to your workers.

mdurant
  • 27,272
  • 5
  • 45
  • 74
  • Hm, I'm confused. Right now if I have two files and call `dask.dataframe.read_csv([f1, f2], blocksize=100000)` I get 2219 partitions - the first ~1000 are from f1, the rest from f2 . What I'd like to do is step through f1 and f2 in parallel: the first 500 rows of f1 and f2, the 2nd 500 rows of f1&f2, etc... If I call map_partitions, doesn't it immediately step through all of the partitions? This is because I can't fit all the results of mapping in memory at the same time - I have to consume them a batch at a time... – Colin Nov 18 '18 at 20:33
0

So luckily I think this problem maps nicely onto python's multiprocessing .Process and .Queue.

def data_generator(whatever):
   for v in something(whatever):
      yield v

def generator_constructor(whatever):
   def generator(outputQueue):
      for d in data_generator(whatever):
         outputQueue.put(d)
      outputQueue.put(None) # sentinel
   return generator

def procSumGenerator():
   outputQs = [Queue(size) for _ in range(NumCores)]
   procs = [Process(target=generator_constructor(whatever),
                    args=(outputQs[i],))
            for i in range(NumCores)] 

   for proc in procs: proc.start()

   # until any output queue returns a None, collect 
   # from all and yield
   done = False
   while not done:
      results = [oq.get() for oq in outputQs]
      done = any(res is None for res in results)
      if not done:
         yield some_combination_of(results)

   for proc in procs: terminate()

for v in procSumGenerator():
   print(v)

Maybe this can be done better with Dask? I find that my solution fairly quickly saturates the network for large sizes of generated data - I'm manipulating csvs with pandas and returning large numpy arrays.

https://github.com/colinator/doodle_generator/blob/master/data_generator_uniform_final.ipynb

Colin
  • 3,670
  • 1
  • 25
  • 36