1

I'm trying to make something like "task manager" using thread in Django which will be waiting some job.

import multiprocessing
from Queue import Queue


def task_maker(queue_obj):
    while True:
        try:
            print queue_obj.qsize() # << always print 0
            _data = queue_obj.get(timeout=10)
            if _data:
                _data['function'](*_data['args'], **_data['kwargs'])
        except Empty:
            pass
        except Exception as e:
            print e


tasks = Queue()
stream = multiprocessing.Process(target=task_maker, args=(tasks,))
stream.start()


def add_task(func=lambda: None, args=(), kwargs={}):
    try:
        tasks.put({
            'function': func,
            'args': args,
            'kwargs': kwargs
        })
        print tasks.qsize() # print a normal size 1,2,3,4...

    except Exception as e:
        print e

I'm using "add_task" in views.py files, when user makes some request. Why queue in "stream" always empty? what i'm doing wrong?

Taras Protsenko
  • 545
  • 1
  • 6
  • 15
  • Have you considered just using [`celery`](http://docs.celeryproject.org/en/latest/index.html)? It is easy to use, mature, and plays well with django. It really would be the best tool for this unless you have an overwhelming reason to reinvent the wheel. – dgel May 23 '14 at 16:28

2 Answers2

1

There are two issues with the current code. 1) with multiprocess (but not threading), the qsize() function is unreliable -- I suggest don't use it, as it is confusing. 2) you can't modify an object directly that's been taken from a queue.

Consider two processes, sending data back and forth. One won't know if the other has modified some data, as data is private. To communicate, send data explicitly, with Queue.put() or using a Pipe.

The general way producer/consumer system works is this: 1) jobs are stuff into a queue 2) worker blocks, waiting for work. When a job appears, it puts the result on a different queue. 3) a manager or 'beancounter' process consumes the output from the 2nd queue, and prints it or otherwise processes it.

Have fun!

#!/usr/bin/env python

import logging, multiprocessing, sys


def myproc(arg):
    return arg*2

def worker(inqueue, outqueue):
    logger = multiprocessing.get_logger()
    logger.info('start')
    while True:
        job = inqueue.get()
        logger.info('got %s', job)
        outqueue.put( myproc(job) )

def beancounter(inqueue):
    while True:
        print 'done:', inqueue.get()

def main():
    logger = multiprocessing.log_to_stderr(
            level=logging.INFO,
    )
    logger.info('setup')

    data_queue = multiprocessing.Queue()
    out_queue = multiprocessing.Queue()

    for num in range(5):
        data_queue.put(num)

    worker_p = multiprocessing.Process(
        target=worker, args=(data_queue, out_queue), 
        name='worker',
    )
    worker_p.start()

    bean_p = multiprocessing.Process(
        target=beancounter, args=(out_queue,),
        name='beancounter',
        )
    bean_p.start()

    worker_p.join()
    bean_p.join()
    logger.info('done')


if __name__=='__main__':
    main()
johntellsall
  • 14,394
  • 4
  • 46
  • 40
1

I've got it. I do not know why, but when I tried "threading", it worked!

from Queue import Queue, Empty
import threading

MailLogger = logging.getLogger('mail')


class TaskMaker(threading.Thread):

    def __init__(self, que):
        threading.Thread.__init__(self)
        self.queue = que

    def run(self):
        while True:
            try:
                print "start", self.queue.qsize()
                _data = self.queue.get()
                if _data:
                    print "make"
                    _data['function'](*_data['args'], **_data['kwargs'])
            except Empty:
                pass
            except Exception as e:
                print e
                MailLogger.error(e)

tasks = Queue()
stream = TaskMaker(tasks)
stream.start()


def add_task(func=lambda: None, args=(), kwargs={}):
    global tasks
    try:
        tasks.put_nowait({
            'function': func,
            'args': args,
            'kwargs': kwargs
        })

    except Exception as e:
        print e
        MailLogger.error(e)
Taras Protsenko
  • 545
  • 1
  • 6
  • 15
  • 1
    with multiprocess (but not threading), the qsize() function is unreliable. With `threading` the queue is available in the same process, and so is reliable. With multiple processes, if one proc checks the size and another adds an element at the same time, things can get confusing. Thus, `qsize` is unreliable with multiprocessing. – johntellsall May 26 '14 at 18:48