1

Possible Duplicate:
Python multiprocessing pool.map for multiple arguments

I'd like to feed two arguments to subprocesses in a multiprocessing.Pool? I feel like I'm pushing uphill. Is it possible as 2 args, or a tuple, or... at all? It seems to work fine for some, correctly passing out the two filenames (in and out), but then barfs unexpectedly at a variable point. Sadly it's not really working because the output files are all empty - which is not what happens if I call it directly, or single processedly. There is another complicating factor, the called routine is in another, imported, module. Localising that as a 'foo' stub module does fix things but its only printing the args, not trying to do any actual work.

Which is perhaps a long way to stubbornly resist learning how to use a Queue, but I'd just like to confirm I'm not going to get anywhere pushing down the path I'm on.

fixtures/txt_data/AAD.txt obj/txt_data/AAD.txt
fixtures/txt_data/ANZSMW.txt obj/txt_data/ANZSMW.txt
fixtures/txt_data/BENPA.txt obj/txt_data/BENPA.txt
fixtures/txt_data/CBAIZQ.txt obj/txt_data/CBAIZQ.txt
Traceback (most recent call last):
  File "./jobflow.py", line 60, in <module>
    main()
  File "./jobflow.py", line 57, in main
    args.func(args)
  File "./jobflow.py", line 40, in market
    pool.map(foo, market_files())
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 225, in map
  return self.map_async(func, iterable, chunksize).get()
File     "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 522, in get
raise self._value
TypeError: function takes exactly 1 argument (2 given)
fixtures/txt_data/CSDO.txt obj/txt_data/CSDO.txt
fixtures/txt_data/EMB.txt obj/txt_data/EMB.txt
fixtures/txt_data/GGG.txt obj/txt_data/GGG.txt
fixtures/txt_data/IDL.txt obj/txt_data/IDL.txt

That's an error sample. It halts before the files are exhausted. It alternatively moans that it wants 2 args but only got 1 when I change it to try and pass two args:

def foo(c):
    a, b, = c
    print a, b
    market2.file_main((a, b))  # does comment/uncommenting this break it only because it's in another python file?

def market(args):
    """
    read raw ticker data files and output nice, clean, more valid ticker data files
    """
    pool = multiprocessing.Pool()

    class market_files(object):
        for infile in args.infiles:
            outfile = os.path.join(args.outdir, os.path.basename(infile))
            yield (infile, outfile)

    pool.map(foo, market_files())
Community
  • 1
  • 1
John Mee
  • 50,179
  • 34
  • 152
  • 186

1 Answers1

4

Oh, wait, it does work, but not by passing multiple args directly, but putting them into a tuple.

I implemented it by spawning a new Process and p.start() on every iteration which generated a ridiculous number of processes ;-) but did swallow the multiple arguments.

Working back from there I simplified the iterable into a list (now that I've got it worked out an iterable is probably fine) but I think the main thing was in passing args as a tuple. Must've been one of those cases of having too much mess on the cutting room floor to see the solution which had worked.

So in the controller I have:

    # Create a list of filenames.
    arglist = []
    for infile in args.infiles:
        outfile = os.path.join(args.outdir, os.path.basename(infile))
        arglist.append((infile, outfile))

    # Pass each process one filename to work on.
    pool = multiprocessing.Pool()
    p = pool.map(func=market2.process, iterable=arglist)

And in the module:

    def process(x):
        # Open an input file, and output file, and do work.
        infile, outfile = x
        instream = open(infile, 'rB')
        outstream = open(outfile, 'wB')
        main(instream, outstream)
        instream.close()
        outstream.close()

4 cores performance (minutes):

  • Single threaded = 3:54
  • using subprocess = 4:52 (i think it blocks by default so that would figure)
  • using squillions of Process's simultaneously = 2:41 (saturated all cores with 1-4%cpu per process)
  • using Pool = 2:13
John Mee
  • 50,179
  • 34
  • 152
  • 186
  • Edit: lols. me thinks it was looping over pushing everything through - so running the cartesian product of what really wanted. lemme fix all that! – John Mee Oct 22 '12 at 10:08