0

I have a primitive producer/consumer script running in gevent. It starts a few producer functions that put things into a gevent.queue.Queue, and one consumer function that fetches them out of the queue again:

from __future__ import print_function

import time

import gevent
import gevent.queue
import gevent.monkey

q = gevent.queue.Queue()

# define and spawn a consumer
def consumer():
    while True:
        item = q.get(block=True)
        print('consumer got {}'.format(item))

consumer_greenlet = gevent.spawn(consumer)

# define and spawn a few producers
def producer(ID):
    while True:
        print("producer {} about to put".format(ID))
        q.put('something from {}'.format(ID))
        time.sleep(0.1)
#       consumer_greenlet.switch()      

producer_greenlets = [gevent.spawn(producer, i) for i in range(5)]

# wait indefinitely
gevent.monkey.patch_all()
print("about to join")
consumer_greenlet.join()

It works fine if I let gevent handle the scheduling implicitly (e.g. by calling time.sleep or some other gevent.monkey.patch()ed function), however when I switch to the consumer explicitly (replace time.sleepwith the commented-out switch call), gevent raises an AssertionError:

Traceback (most recent call last):
  File "/my/virtualenvs/venv/local/lib/python2.7/site-packages/gevent/greenlet.py", line 327, in run
    result = self._run(*self.args, **self.kwargs)
  File "switch_test.py", line 14, in consumer
    item = q.get(block=True)
  File "/my/virtualenvs/venv/lib/python2.7/site-packages/gevent/queue.py", line 201, in get
    assert result is waiter, 'Invalid switch into Queue.get: %r' % (result, )
AssertionError: Invalid switch into Queue.get: ()
<Greenlet at 0x7fde6fa6c870: consumer> failed with AssertionError

I would like to employ explicit switching because in production I have a lot of producers, gevent's scheduling does not allocate nearly enough runtime to the consumer and the queue gets longer and longer (which is bad). Alternatively, any insights into how to configure or modify gevent's scheduler is greatly appreciated.

This is on Python 2.7.2, gevent 1.0.1 and greenlet 0.4.5.

Simon
  • 12,018
  • 4
  • 34
  • 39
  • Maybe you could look at the size of the queue and make the produce pause if it's above a certain size? It sounds like the underlying issue is that the producers (if left unchecked) produce way more than the consumer can handle? How come you're using gevent for this as opposed to threads or multiprocessing? – Tom Dalton Aug 03 '15 at 10:50
  • I have actually already implemented the waiting you suggested, but it leads to a lot of waiting on the producer side which leads to reduced system throughput (and this system really needs to get things done quickly...). The consumer could certainly handle the load, the producers load and analyze URLs and all the consumer does is writing results to the database. I use gevent because it reduces synchronization headaches that threads introduce, and because multiprocessing consumes significant amounts of memory once more than a few dozen worker processes are spawned. – Simon Aug 03 '15 at 12:38
  • I think I'm missing something here. If the producers are able to produce more than the consumer can handle, surely your only other option is to add more consumers, or reduce the amount the producers are producing (which is what the waiting does). I'm not sure what you mean by the reduced system throughput - isn't the bottleneck the limitation on how quickly the consumer can process the queue? What synchronisation issues does threading introduce the gevent doesn't? – Tom Dalton Aug 03 '15 at 12:52
  • The producers load URLs from the web, waiting ~1s for a reply on average, and put some extracted data into the queue (this only takes a few ms). The consumer commits those data items to the database (this also takes no more than a few ms). From these numbers, I should be able to have hundreds of producers without saturating the CPU, but gevent would have to allocate half the CPU time to the consumer, which it doesn't. – Simon Aug 03 '15 at 16:00
  • Here's a good explanation on threading vs. greenlets: http://stackoverflow.com/questions/15556718/greenlet-vs-threads – Simon Aug 03 '15 at 16:01
  • That thread doesn't answer my question about what sync problems you are avoiding by using gevent over threading. Why do you have a need for a producer/consumer setup over each worker getting the data and then storing the data as a single 'task'? Then you can have as many workers as you like without needing to balance producer/consumer resources. – Tom Dalton Aug 03 '15 at 22:58
  • I'd rather discuss the problem at hand and not my design choices, please. – Simon Aug 04 '15 at 07:09

1 Answers1

0

Seems to me explicit switch doesn't really play well with implicit switch. You already have implicit switch happening either because monkey-patched I/O or because the gevent.queue.Queue().

The gevent documentation discourages usage of the raw greenlet methods:

Being a greenlet subclass, Greenlet also has switch() and throw() methods. However, these should not be used at the application level as they can very easily lead to greenlets that are forever unscheduled. Prefer higher-level safe classes, like Event and Queue, instead.

Iterating gevent.queue.Queue() or accessing the queue's get method does implicit switching, interestingly put does not. So you have to generate an implicit thread switch yourself. Easiest is to call gevent.sleep(0) (you don't have to actually wait a specific time).

In conclusion you don't even have to monkey-pach things, provide that your code does not have blocking IO operations.

I would rewrite your code like this:

import gevent
import gevent.queue

q = gevent.queue.Queue()

# define and spawn a consumer
def consumer():
    for item in q:
        print('consumer got {}'.format(item))

consumer_greenlet = gevent.spawn(consumer)

# define and spawn a few producers
def producer(ID):
    print('producer started', ID)
    while True:
        print("producer {} about to put".format(ID))
        q.put('something from {}'.format(ID))
        gevent.sleep(0)

producer_greenlets = [gevent.spawn(producer, i) for i in range(5)]
# wait indefinitely
print("about to join")
consumer_greenlet.join()
sanyi
  • 5,999
  • 2
  • 19
  • 30
  • Wow, I completely overlooked your answer, thank you! `put()` not switching implicitly is an interesting observation, I'll have a look at the production code and see if that changes things... – Simon Dec 02 '15 at 10:23