0

I have written a Python script for tiling imagery using the GDAL open source library and the command line utilities provided with that library. First, I read an input dataset that tells me each tile extent. Then, I loop through the tiles and start a subprocess to call gdalwarp in order to clip the input image to the current tile in the loop.

I don't to use Popen.wait() because this will keep the tiles from being processed concurrently, but I do want to keep track of any messages returned by the subprocess. In addition, once a particular tile is done being created, I need to calculate the statistics for the new file using gdalinfo, which requires another subprocess.

Here is the code:

processing = {}
for tile in tileNums:
    subp = subprocess.Popen(['gdalwarp', '-ot', 'Int16', '-r', 'cubic', '-of', 'HFA', '-cutline', tileIndexShp, '-cl', os.path.splitext(os.path.basename(tileIndexShp))[0], '-cwhere', "%s = '%s'" % (tileNumField, tile), '-crop_to_cutline', os.path.join(inputTileDir, 'mosaic_Proj.vrt'), os.path.join(outputTileDir, "Tile_%s.img" % regex.sub('_', tile))], stdout=subprocess.PIPE)
    processing[tile] = [subp]

while processing:
    for tile, subps in processing.items():
        for idx, subp in enumerate(subps):
            if subp == None: continue
            poll = subp.poll()
            if poll == None: continue
            elif poll != 0:
                subps[idx] = None
                print tile, "%s Unsuccessful" % ("Retile" if idx == 0 else "Statistics")
            else:
                subps[idx] = None
                print tile, "%s Succeeded" % ("Retile" if idx == 0 else "Statistics")
                if subps == [None, None]:
                    del processing[tile]
                    continue
                subps.append(subprocess.Popen(['gdalinfo', '-stats', os.path.join(outputTileDir, "Tile_%s.img" % regex.sub('_',tile))], stdout=subprocess.PIPE))

For the most part, this works for me, but the one issue I am seeing is that it seems to create an infinite loop when it gets to the last tile. I know this is not the best way to do this, but I am very new to the subprocess module and I basically just threw this together to try and get it to work.

Can anyone recommend a better way to loop through the list of tiles, spawn a subprocess for each tile that can process concurrently, and spawn a second subprocess when the first completes for each tile?

UPDATE: Thanks for all the advice so far. I tried to refactor the code above to take advantage of the multiprocessing module and Pool.

Here is the new code:

def ProcessTile(tile):

    tileName = os.path.join(outputTileDir, "Tile_%s.img" % regex.sub('_', tile))

    warp = subprocess.Popen(['gdalwarp', '-ot', 'Int16', '-r', 'cubic', '-of', 'HFA', '-cutline', tileIndexShp, '-cl', os.path.splitext(os.path.basename(tileIndexShp))[0], '-cwhere', "%s = '%s'" % (tileNumField, tile), '-crop_to_cutline', os.path.join(inputTileDir, 'mosaic_Proj.vrt'), tileName], stdout=subprocess.PIPE)
    warpMsg = tile, "Retile %s" % "Successful" if warp.wait() == 0 else "Unsuccessful"

    info = subprocess.Popen(['gdalinfo', '-stats', tileName], stdout=subprocess.PIPE)
    statsMsg = tile, "Statistics %s" % "Successful" if info.wait() == 0 else "Unsuccessful"

    return warpMsg, statsMsg

print "Retiling..."
pool = multiprocessing.Pool()
for warpMsg, statsMsg in pool.imap_unordered(ProcessTile, tileNums): print "%s\n%s" % (warpMsg, statsMsg)

This is causing some major problems for me. First of all, I end up with many new processes being created. About half are python.exe and the other half are another gdal utility that I call before the code above to mosaic the incoming imagery if it is already tiled in another tiling scheme (gdalbuildvrt.exe). Between all the python.exe and gdalbuildvrt.exe processes that are being created, about 25% of my CPU (Intel I7 with 8 cores when hyperthreaded) and 99% of my 16gb of RAM are in use and the computer completely hangs. I can't even kill the processes in Task Manager or via command line with taskkill.

What am I missing here?

Brian
  • 2,702
  • 5
  • 37
  • 71
  • You could use threading and spawn threads that run those processes and wait for them to finish. – Blender Dec 17 '12 at 21:54
  • I'm trying to avoid waiting for one subprocess to finish before starting the next one. Obviously, I have to wait to start the second subprocess that will operate on the same tile, but I don't want to wait for one tile to complete processing for the next tile to begin. – Brian Dec 17 '12 at 21:56
  • related: [examples of managing subprocesses using multiprocessing.Pool, concurrent.futures, threading + Queue](http://stackoverflow.com/a/9874484/4279) – jfs Dec 17 '12 at 22:05
  • @Brian: if can wait for more than one subprocess simultaneously in different threads, see the example I've linked – jfs Dec 17 '12 at 22:06
  • Consider modifying your ProcessTile function so it takes a JoinableQueue as its argument so that it will iteratively pull tile numbers from the queue until it is empty. Then, you can create a fixed number of processes to pull from the queue to avoid creating too many processes. After you have populated the queue and created your ProcessTile processes, your main process will just call `join` on the queue to wait until all the tiles have been processed. – bogatron Dec 18 '12 at 13:41

2 Answers2

2

Instead of spawning and managing individual subprocesses, use the python multiprocessing module to create a Pool of processes.

bogatron
  • 18,639
  • 6
  • 53
  • 47
  • So would I just write a function that takes a single tile as an input, creates the first subprocess (Popen), waits for that subprocess to complete (wait()), then starts the second subprocess and waits for the second subprocess to complete? If I had this function I could use Pool.map to apply that function to each tile in a separate process? – Brian Dec 17 '12 at 22:16
  • You would write a function that spawns a gdal process for a particular tile (either view subprocess or just os.system) and waits for it. That function will be called multiple times by each python process in the Pool. Yes, you can either use `map` or, if you want to have a fixed number of processes, you could use a Queue (described at the same URL). If you have one process for each tile, then the function would just process a single tile and return (see section 16.6.1.2). – bogatron Dec 17 '12 at 22:22
0

I haven't tested it, but it should work:

import Queue

from threading import Thread

class Consumer(Thread):
    def __init__(self, queue=None):
        super(Consumer, self).__init__()

        self.daemon = True
        self.queue = queue


    def run(self):
        while True:
            task = self.queue.get()

            # Spawn your process and .wait() for it to finish.

            self.queue.task_done()

if __name__ == '__main__':
     queue = Queue.Queue()

     for task in get_tasks():
         queue.put(task)

     # You spawn 20 worker threads to process your queue nonstop
     for i in range(20):
         consumer = Consumer(queue)
         consumer.start()

     queue.join()

Basically, you have a queue filled with tasks that you need to accomplish. Then, you just spawn 20 worker threads to continually pull new tasks from the queue and process them concurrently.

Blender
  • 289,723
  • 53
  • 439
  • 496
  • The tricky bit is that he has 2 types of tasks, and the first 1 needs to push the second 1 to the task queue on success. Not too tricky sure, but means the consumer needs to be able to write to the same queue it reads from, and he needs to wrap the first popen in a function that handles queuing the second. – Silas Ray Dec 17 '12 at 22:10