0

I would like your input on the following:

Since a short while I've started a job at a new company, which is running processes on a cluster. There is an existing pipeline which is already implemented and roughly does the following:

  1. Big files are stored per +- 200 on a harddisk (+- 130gb per file)
  2. Since there is a disk quota on the cluster, and its very IO intensive to copy, I have to limit myself to copy only 1 file over at a time.
  3. A manager java program will create a pull script to pull the big files across the network (from NAS to cluster).
  4. After pulling, the analysis pipeline runs on the cluster (black box process to me).
  5. Next, an 'am I finished' script is checking to see if the process is completed on the cluster. If it isn't, the script sleeps for 10mins and checks again, if it's finished, the big file gets removed (black box script to me).

So at the moment I've made a very simple implementation of a manager program in python, doing this, and once done, execute the next file to copy over and repeat the list of jobs.

My question is, I would like to expand this program, so that it will use 5 (maybe more later) big files to copy at once, and submit them to the cluster, and only delete and remove a process once it's done running.

When looking for a solution, I've seen people mentioning using multiple threads or multiprocessing, specifically to use a pool of workers. I have no experience with this yet (but one can learn right?) but I think it will be a viable option to go with in this case. My question would be, how will I setup a pool of 5 workers so that each worker does a series of tasks, and once completed, takes a new big file from the queue and iterates.

MrFronk
  • 382
  • 5
  • 23

1 Answers1

1

multiprocessing.Pool is designed for this exact use-case:

import multiprocessing

def process_big_file(big_file):
    print("Process {0}: Got big file {1}".format(multiprocessing.current_process(), big_file))
    return "done with {0}".format(big_file)

def get_big_file_list():
    return ['bf{0}'.format(i) for i in range(20)]  # Just a dummy list


if __name__ == "__main__":
    pool = multiprocessing.Pool(5)  # 5 worker processes in the pool
    big_file_list = get_big_file_list()
    results = pool.map(process_big_file, big_file_list)
    print(results)

Output:

Process <Process(PoolWorker-1, started daemon)>: Got big file bf0
Process <Process(PoolWorker-1, started daemon)>: Got big file bf1
Process <Process(PoolWorker-3, started daemon)>: Got big file bf2
Process <Process(PoolWorker-4, started daemon)>: Got big file bf3
Process <Process(PoolWorker-5, started daemon)>: Got big file bf4
Process <Process(PoolWorker-5, started daemon)>: Got big file bf5
Process <Process(PoolWorker-5, started daemon)>: Got big file bf6
Process <Process(PoolWorker-3, started daemon)>: Got big file bf7
Process <Process(PoolWorker-3, started daemon)>: Got big file bf8
Process <Process(PoolWorker-2, started daemon)>: Got big file bf9
Process <Process(PoolWorker-2, started daemon)>: Got big file bf10
Process <Process(PoolWorker-2, started daemon)>: Got big file bf11
Process <Process(PoolWorker-4, started daemon)>: Got big file bf12
Process <Process(PoolWorker-4, started daemon)>: Got big file bf13
Process <Process(PoolWorker-4, started daemon)>: Got big file bf14
Process <Process(PoolWorker-4, started daemon)>: Got big file bf15
Process <Process(PoolWorker-4, started daemon)>: Got big file bf16
Process <Process(PoolWorker-4, started daemon)>: Got big file bf17
Process <Process(PoolWorker-4, started daemon)>: Got big file bf18
Process <Process(PoolWorker-4, started daemon)>: Got big file bf19

['done with bf0', 'done with bf1', 'done with bf2', 'done with bf3', 'done with bf4', 'done with bf5', 'done with bf6', 'done with bf7', 'done with bf8', 'done with bf9', 'done with bf10', 'done with bf11', 'done with bf12', 'done with bf13', 'done with bf14', 'done with bf15', 'done with bf16', 'done with bf17', 'done with bf18', 'done with bf19']

The pool.map call uses an internal queue to distribute all the items in the big_file_list to the workers in the queue. As soon as a worker finishes a task, it just pulls the next item off the queue, and continues until the queue is empty.

dano
  • 91,354
  • 19
  • 222
  • 219
  • I adapted this code, and it works perfectly, however I cannot seem to kill the process once its running, is this true ? and how can i still kill the program with ctrl-c for example? (in my terminal) – MrFronk Aug 08 '14 at 13:12
  • @MrFronk Yes, that's an unfortunate bug in `multiprocessing`. [This answer](http://stackoverflow.com/a/1408476/2073595) has a workaround. – dano Aug 08 '14 at 14:32