0

I've never worked with multiprocessing before so bear with me if I'm asking a basic question.

This answer provided a very nice processing class that I adapted to my needs and it works very well. I'm trying to implement a basic progress bar which I'm testing using print statements, but it is not working at all (no output whatsoever).

My current code is this:

class ParsingMaster(object):
  def __init__(self, parser, input_file, output_file):
    self.parser = parser

    self.num_processes = cpu_count()
    self.input_file = input_file
    self.output_file = output_file

    self.input_queue = Queue()
    self.output_queue = Queue()

    self.input_size = 0

    self.input_process = Process(target=self.parse_input)
    self.output_process = Process(target=self.write_output)
    self.processes = [Process(target=self.process_row) for row in range(self.num_processes)]

    self.input_process.start()
    self.output_process.start()

    for process in self.processes:
      process.start()

    self.input_process.join()

    for process in self.processes:
      process.join()

    self.output_process.join()

  def parse_input(self):
    for index, row in enumerate(self.input_file):
      self.input_queue.put([index, row])
      self.input_size = self.input_queue.qsize()

    for i in range(self.num_processes):
      self.input_queue.put('STOP')

  def process_row(self):
    for index, row in iter(self.input_queue.get, 'STOP'):
      self.output_queue.put([index, row[0], self.parser.parse(row[1])])

    self.output_queue.put('STOP')

  def write_output(self):
    current = 0
    buffer = {}

    for works in range(self.num_processes):
      for index, id, row in iter(self.output_queue.get, 'STOP'):
        if index != current:
          buffer[index] = [id] + row
        else:
          self.output_file.writerow([id] + row)
          current += 1

          while current in buffer:
            self.output_file.writerow(buffer[current])
            del buffer[current]
            current += 1

            if self.input_size:
              print float(current * 100) / float(self.input_size)

After some testing, I've found some strange things:

  • self.input_size is updated properly in parse_input().
  • parse_input() ends while write_output() is still running.
  • write_output() always reports that self.input_size = 0.

Can anyone tell me where I'm going wrong here? Any help is helpful, so thank you in advance.

Community
  • 1
  • 1
Blender
  • 289,723
  • 53
  • 439
  • 496

1 Answers1

2

self.input_size is a process-local variable, each process will have its own copy. According to the multiprocessing documentation, you need to wrap your data into containers like Value and Array to make it shared.

bereal
  • 32,519
  • 6
  • 58
  • 104
  • Thanks. I managed to figure it out myself about an hour ago, but it's good to know I was going in the right direction ;) – Blender Mar 09 '12 at 08:50