6

I have a web server that is connecting to one of many serverlets. The web server might queue up to 40 jobs that can take 20 mins or 30 hours to run each.

The web server connects to a serverlet using sockets and the serverlet runs the job sent through using threads.

I want to put a cap on the number of threads (jobs) that can be run at once, say 3 and once that limit is reached it holds the main thread. When one of the threads ends it allows the main thread to continue and pickup another job.

# Wait for thread count to reduce before continuing
while threading.active_count() >= self.max_threads:
    pass 

I'm currently using a loop to make my main thread wait until a free thread is available. It works, but it feels like a quick and dirty solution. I wonder if there might be a better way to do it?

server.py

import socket
import sys
import urllib, urllib2
import threading
import cPickle

from supply import supply


class supply_thread(threading.Thread):

    def __init__(self, _sock):
        threading.Thread.__init__(self)
        self.__socket = _sock

    def run(self):
        data = self.readline()
        self.__socket.close()
        new_supply = supply.supply(data)
        new_supply.run()

    def readline(self):
        """ read data sent from webserver and decode it """

        data = self.__socket.recv( 1024 )
        if data:
            data = cPickle.loads(data)
            return data



class server:

    def __init__(self):
        ## Socket Vars
        self.__socket = None
        self.HOST = ''
        self.PORT = 50007
        self.name = socket.gethostname()

        self.max_jobs = 3


    def listen(self):
        """ Listen for a connection from the webserver """

        self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # Allows quick connection from the same address
        self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        self.__socket.bind((self.HOST, self.PORT))
        return self.__socket.listen(1)

    def connect(self):
        webserver = self.__socket.accept()[0]
        print 'Connected by', webserver

        new_thread = supply_thread(webserver)
        print 'Starting thread' , new_thread.getName()

        new_thread.start()

    def close(self):
        return self.__socket.close()


    def run(self):
        import time

        while True:
            print(sys.version)

            # Wait for connection from Webserver
            self.listen()

            time.sleep(3)

            # Let the Webserver know I'm avilable
            self.status(status='Available')

            print 'Waiting for connection...'
            self.connect()

            print 'thread count:', threading.enumerate()
            print 'thread count:', threading.active_count()

            while threading.active_count() >= self.max_jobs:
                pass


    def status(self, status='Available'):
        computer_name = socket.gethostname()
        svcURL = "http://localhost:8000/init/default/server"
        params = {
            'computer_name':computer_name,
            'status':status,
            'max_jobs':self.max_jobs
        }
        svcHandle = urllib2.urlopen(svcURL, urllib.urlencode(params))
James Burke
  • 2,229
  • 1
  • 26
  • 34
  • Information about how you start your threads would be helpful in suggesting ways to approach providing a pooled implementation. Do you have a list of sites and their parameters set up? You could easily add a `Queue` or even a simple `deque` and have each of a set number of threads consume from that, but it all depends on what you're starting with. – g.d.d.c May 10 '13 at 03:27
  • 1
    a simple way to limit number of concurrent jobs is to use a pool of exactly 3 threads to run all jobs e.g., [to download a file using exactly 4 threads you could use `multiprocessing.dummy.Pool`](http://stackoverflow.com/a/13990003/4279) – jfs May 10 '13 at 04:05

1 Answers1

5

This sounds like a good use case for Celery.

Basically, you would create a Celery task in a tasks.py file and then call it with taskname.delay(). It will dispatch the task to a Celery worker and start working on it if the worker is ready to accept another task. Here's an example from the docs.

By default, Celery will create a worker that has concurrency equal to the number of cores in your machine according to the documentation. You can change that if you need to.

Alternatively, you could use Queues. Here's another example of how that might look.

Kevin London
  • 4,628
  • 1
  • 21
  • 29
  • thanks for the suggestion, I'd like to stick with as much native Python as possible. – James Burke May 10 '13 at 04:31
  • Okay, in that case I think that Queues would be a better fit. I'll add an example shortly. – Kevin London May 10 '13 at 04:56
  • I've included an example of how a queue might work. You would put work into the queue and set a thread to watch the queue. When the queue fills, you will not be able to put any more in there. You could create three separate queues that each have a single thread, as an idea. – Kevin London May 10 '13 at 07:15
  • Thanks Kevin. To my understanding performing a .join() on a queue would halt the main thread till all jobs were complete in the queue? – James Burke May 10 '13 at 21:30
  • Ideally I'd want the main thread to resume after a job finished, not wait till all jobs have finished, otherwise it's not very efficient if the last job takes an additional 20 hours. Thanks for your advice. – James Burke May 11 '13 at 01:16
  • Sure thing. If you've found this helpful, I would appreciate if you could click the check mark to mark this answer as accepted. – Kevin London May 14 '13 at 22:02