0

I have following code:

#!/usr/bin/env python

def do_job(row):
  # COMPUTING INTENSIVE OPERATION
  sleep(1)
  row.append(int(row[0])**2)

  # WRITING TO FILE - ATOMICITY ENSURED
  semaphore.acquire()
  print "Inside semaphore before writing to file: (%s,%s,%s)" % (row[0], row[1], row[2])
  csvWriter.writerow(row)
  print "Inside semaphore after writing to file"
  semaphore.release()

  # RETURNING VALUE
  return row

def parallel_csv_processing(inputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1):
  # OPEN FH FOR READING INPUT FILE
  inputFH   = open(inputFile,  "rb")
  csvReader = csv.reader(inputFH, delimiter=separator)

  # SKIP HEADERS
  for skip in xrange(skipRows):
    csvReader.next()

  # WRITE HEADER TO OUTPUT FILE
  csvWriter.writerow(header)

  # COMPUTING INTENSIVE OPERATIONS
  try:
    p = Pool(processes = cpuCount)
    # results = p.map(do_job, csvReader, chunksize = 10)
    results = p.map_async(do_job, csvReader, chunksize = 10)

  except KeyboardInterrupt:
    p.close()
    p.terminate()
    p.join()

  # WAIT FOR RESULTS
  # results.get()
  p.close()
  p.join()

  # CLOSE FH FOR READING INPUT
  inputFH.close()

if __name__ == '__main__':
  import csv
  from time import sleep
  from multiprocessing import Pool
  from multiprocessing import cpu_count
  from multiprocessing import current_process
  from multiprocessing import Semaphore
  from pprint import pprint as pp
  import calendar
  import time

  SCRIPT_START_TIME  = calendar.timegm(time.gmtime())
  inputFile  = "input.csv"
  outputFile = "output.csv"
  semaphore = Semaphore(1)

  # OPEN FH FOR WRITING OUTPUT FILE
  outputFH  = open(outputFile, "wt")
  csvWriter = csv.writer(outputFH, lineterminator='\n')
  csvWriter.writerow(["before","calling","multiprocessing"])
  parallel_csv_processing(inputFile, cpuCount = cpu_count())
  csvWriter.writerow(["after","calling","multiprocessing"])

  # CLOSE FH FOR WRITING OUTPUT
  outputFH.close()

  SCRIPT_STOP_TIME   = calendar.timegm(time.gmtime())
  SCRIPT_DURATION    = SCRIPT_STOP_TIME - SCRIPT_START_TIME
  print "Script duration:    %s seconds" % SCRIPT_DURATION

After running the output on terminal is following:

Inside semaphore before writing to file: (0,0,0)
Inside semaphore after writing to file
Inside semaphore before writing to file: (1,3,1)
Inside semaphore after writing to file
Inside semaphore before writing to file: (2,6,4)
Inside semaphore after writing to file
Inside semaphore before writing to file: (3,9,9)
Inside semaphore after writing to file
Inside semaphore before writing to file: (4,12,16)
Inside semaphore after writing to file
Inside semaphore before writing to file: (5,15,25)
Inside semaphore after writing to file
Inside semaphore before writing to file: (6,18,36)
Inside semaphore after writing to file
Inside semaphore before writing to file: (7,21,49)
Inside semaphore after writing to file
Inside semaphore before writing to file: (8,24,64)
Inside semaphore after writing to file
Inside semaphore before writing to file: (9,27,81)
Inside semaphore after writing to file
Script duration:    10 seconds

content of input.csv is following:

0,0
1,3
2,6
3,9
4,12
5,15
6,18
7,21
8,24
9,27

created content of output.csv is following:

before,calling,multiprocessing
Default,header,please,change
after,calling,multiprocessing

Why Is nothing written to output.csv from parallel_csv_processing resp. do_job method?

Wakan Tanka
  • 7,542
  • 16
  • 69
  • 122

1 Answers1

1

Your processes are silently failing with an exception - specifically, in the spawned processes the script doesn't have a value for csvWriter because they are each in a separate python interpreter, and haven't run main() - this is deliberate, you don't want the subprocesses to run main. The do_job() function can only access values you pass to it explicitly in the map_async() call, and you aren't passing csvWriter. Even if you were I'm not sure it would work, don't know if file handles are shared between main and the processes created by multiprocessing.

Put a try/except around the code in do_job and you will see the exception.

def do_job(row):
  try:
      # COMPUTING INTENSIVE OPERATION
      sleep(1)
      row.append(int(row[0])**2)

      # WRITING TO FILE - ATOMICITY ENSURED
      semaphore.acquire()
      print "Inside semaphore before writing to file: (%s,%s,%s)" % (row[0], row[1], row[2])
      csvWriter.writerow(row)
      print "Inside semaphore after writing to file"
      semaphore.release()

      # RETURNING VALUE
      return row
  except:
      print "exception"

Obviously in real code the exception should be handled properly, but if you run this you'll now see exception printed for every invocation of do_job.

Look in the documentation for multiprocessing for more guidance - under the heading "16.6.1.4. Sharing state between processes" in the Python 2.7 Standard Library docs.

  • Thank you for reply. So you are saying that the objects that are not directly passed to `map_async()` are not visible inside `do_job()` right? Then how is possible that the `semaphore` is visible? Also when I declare `global_variable = "global variable"` inside `__main__` I'm able to print it from `do_job` with `print global_variable`. Isn't this the same scenario? – Wakan Tanka Sep 07 '15 at 11:20
  • I would like to ask why is this exception silent? From my previous experiences when exception raised I was notified immediately. Is possible to explicitly say not to suppress exceptions silently? I can use `try`, `except` but how can I know which part of code is causing problems? Is there some global settings for this purpose? – Wakan Tanka Sep 07 '15 at 11:21
  • There's no cPython implementation of a way of getting the exception back from the different process to the main() code - you have to implement that yourself see maybe http://stackoverflow.com/questions/6728236/exception-thrown-in-multiprocessing-pool-not-detected and http://stackoverflow.com/questions/16943404/python-multiprocessing-and-handling-exceptions-in-workers – DisappointedByUnaccountableMod Sep 07 '15 at 11:43
  • Thank you, I will check the links. Can you please also react on passing variables that I've mentioned? One possible explanation is that file handlers are not allowed to share between processes while other types are. Is this correct? Thank you – Wakan Tanka Sep 07 '15 at 11:52
  • The processes created by multiprocessing are running in completely separate memory spaces (e.g. they could be a different CPU core) - if you want a single interpreter/memory space use Threading which has very similar API but doesn't use OS processes, so must run in a single CPU core. Read the Python Standard Library docs for multiprocessing, try it out. – DisappointedByUnaccountableMod Sep 07 '15 at 11:58
  • I understand that they are running on different CPU this is what I want. But I am asking how is possible that some variables are shared and some not. I know `Threading` (not in details but found it during my research). But I'm in the situation where I have 6 CPU and during the computing intensive task only 1 is working (I have load 1.0 instead of 6.0) So I decided to use `multiprocessing` instead of `Threading` I suppose this is exactly the purpose of `multiprocessing`. Thank you – Wakan Tanka Sep 07 '15 at 12:06