0

I have python code that takes a bunch of tasks and distributes them to either different threads or different nodes on a cluster. I always end up writing a main script driver.py, that takes two command line arguments: --run-all and --run-task. The first is just a wrapper that iterates through all tasks and then calls driver.py --run-task with each task passed as argument. Example:

== driver.py ==
# Determine the current script
DRIVER = os.path.abspath(__file__)
(opts, args) = parser.parse_args()
if opts.run_all is not None:
  # Run all tasks
   for task in opts.run_all.split(","):
     # Call driver.py again with a specific task
     cmd = "python %s --run-task %s" %(DRIVER, task)
     # Execute on system
     distribute_cmd(cmd)
elif opts.run_task is not None:
  # Run on an individual task
  # code here for processing a task...

The user would then call:

$ driver.py --run-all task1,task2,task3,task4

And each task would get distributed.

The function distribute_cmd takes a shell executable command and sends in a system-specific way to either a node or a thread. The reason driver.py has to find its own name and call itself is because distribute_cmd needs an executable shell command; it cannot take a function name for example.

This consideration led me to this design, of a driver script having two modes and having to call itself. This has two complications: (1) the script has to find out its own path via __file__ and (2) when making this into a Python package, it's unclear where driver.py should go. It's meant to be an executable scripts, but if I put it in setup.py's scripts=, then I will have to find out where the scripts live (see correct way to find scripts directory from setup.py in Python distutils?). This does not seem to be a good solution.

What's an alternative design to this? Keep in mind that the distribution of tasks has to result in an executable command that can be passed as a string to distribute_cmd. thanks.

Community
  • 1
  • 1

1 Answers1

1
  • are you looking for is a library that already does exactly what you need, e.g. Fabric or Celery.
  • if you were not using nodes, I would suggest using multiprocessing.
  • this is a slightly similar question to this one

To be able to execute remotely, you either need:

  • ssh access to the box, in that case you can use Fabric to send your commands.
  • a server, SocketServer, tcp server, or anything that will accept connections.
  • an agent, or client, that will wait for data, if you are using a agent, you may as well use a broker for your messages. Celery allows you to do some of the plumbing, one end puts messages on the queue while the other end gets message from the queue. If the message is a command to execute, then the agent can do an os.system() call, or call subprocess.Popen()

celery example:

 import os
 from celery import Celery
 celery = Celery('tasks', broker='amqp://guest@localhost//')
 @celery.task
 def run_command(command):
    return os.system(command)

You will then need a worker that binds on the queue and waits for tasks to execute. More info in the documentation.

fabric example:

the code:

from fabric.api import run
def exec_remotely(command):
   run(command)

the invocation:

$ fab exec_remotely:command='ls -lh'

More info in the documentation.

batch system case: To go back to the question...

  • distribute_cmd is something that would call bsub somescript.sh
  • you need to find file only because you are going to re-execute the same script with other parameters
  • because of the above, you might have a problem providing a correct distutils script.

Let's question this design.

  • Why do you need to use the same script?
  • Can your driver write scripts then call bsub?
  • Can you use temporary files?
  • Do all the nodes actually share a filesystem?
  • How do you know file is going to exist on the node?

example:

TASK_CODE = {
   'TASK1': '''#!/usr/bin/env python
#... actual code for task1 goes here ...
''',
   'TASK2': '''#!/usr/bin/env python
#... actual code for task2 goes here ...
'''}
# driver portion
(opts, args) = parser.parse_args()
if opts.run_all is not None:
   for task in opts.run_all.split(","):
      task_path = '/tmp/taskfile_%s' % task
      with open(task_path, 'w') as task_file:
         task_file.write(TASK_CODE[task])
      # note: should probably do better error handling.
      distribute_cmd(task_path)
Community
  • 1
  • 1
dnozay
  • 23,846
  • 6
  • 82
  • 104
  • I don't want to rely on a complex framework and I don't see yet how it would solve this problem - could you explain? The lowest level of execution is something like "qsub myscript.sh" - so anything I do from Python has to be a shell executable command in the end.... the question is how to allow that without knowing the script name since Python does not really support that –  Oct 20 '12 at 20:58
  • it would be helpful if you could provide more details on how the code gets distributed. do you use ssh? do you use client - server? – dnozay Oct 20 '12 at 22:39
  • The code just gets distributed by automatically making a shell script (an executable `.sh` file) and then passing that to the cluster submission system, in this case `bsub thescript.sh`. I don't know if any cluster can support `celery` or related packages in this way –  Oct 21 '12 at 00:12
  • To answer your excellent questions about the design: it does not need to be the same script. `driver` might as well call something else - but I don't see how this changes anything, it just pushes the problem to the other script. I can use temporary files, they are all on the head node. The shell script is in fact an intermediate temporary file, containing the call to my Python script. Executing `qsub thatscript.sh` does all the submission to the nodes so that I don't ever have to deal with the nodes. They do happen to share a file system but I never need to explicitly work on nodes –  Oct 22 '12 at 15:41
  • I meant for the temporary file to have the actual code to be executed rather than a call to your script. If you can rely on the filesystem to be shared, then getting to correct path from __file__ is just fine. This assumption will simplify your problem, but your problem seems more of a batch job scheduling problem than a distributed computing. My answer is more generic than having to work with `qsub`. – dnozay Oct 22 '12 at 18:16