This is a long one.
I have a list of usernames and passwords. For each one I want to login to the accounts and do something things. I want to use several machines to do this faster. The way I was thinking of doing this is have a main machine whose job is just having a cron which from time to time checks if the rabbitmq queue is empty. If it is, read the list of usernames and passwords from a file and send it to the rabbitmq queue. Then have a bunch of machines which are subscribed to that queue whose job is receiving a user/pass, do stuff on it, acknowledge it, and move on to the next one, until the queue is empty and then the main machine fills it up again. So far I think I have everything down.
Now comes my problem. I have checked that the things to be done with each user/passes aren't so intensive and so I could have each machine doing three of them simultaneously using python's threading. In fact for a single machine I have implemented this where I load the user/passes into a python Queue() and then have three threads consume that Queue(). Now I want to do something similar, but instead of consuming from a python Queue(), each thread of each machine should consume from a rabbitmq queue. This is where I'm stuck. To run tests I started by using rabbitmq's tutorial.
send.py:
import pika, sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
message = ' '.join(sys.argv[1:])
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
connection.close()
worker.py
import time, pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print ' [x] received %r' % (body,)
time.sleep( body.count('.') )
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='hello', no_ack=False)
channel.start_consuming()
For the above you can run two worker.py which will subscribe to the rabbitmq queue and consume as expected.
My threading without rabbitmq is something like this:
runit.py
class Threaded_do_stuff(threading.Thread):
def __init__(self, user_queue):
threading.Thread.__init__(self)
self.user_queue = user_queue
def run(self):
while True:
login = self.user_queue.get()
do_stuff(user=login[0], pass=login[1])
self.user_queue.task_done()
user_queue = Queue.Queue()
for i in range(3):
td = Threaded_do_stuff(user_queue)
td.setDaemon(True)
td.start()
## fill up the queue
for user in list_users:
user_queue.put(user)
## go!
user_queue.join()
This also works as expected: you fill up the queue and have 3 threads subscribe to it. Now what I want to do is something like runit.py but instead of using a python Queue(), using something like worker.py where the queue is actually a rabbitmq queue.
Here's something which I tried and didn't work (and I don't understand why)
rabbitmq_runit.py
import time, threading, pika
class Threaded_worker(threading.Thread):
def callback(self, ch, method, properties, body):
print ' [x] received %r' % (body,)
time.sleep( body.count('.') )
ch.basic_ack(delivery_tag = method.delivery_tag)
def __init__(self):
threading.Thread.__init__(self)
self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel = self.connection.channel()
self.channel.queue_declare(queue='hello')
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.callback, queue='hello')
def run(self):
print 'start consuming'
self.channel.start_consuming()
for _ in range(3):
print 'launch thread'
td = Threaded_worker()
td.setDaemon(True)
td.start()
I would expect that this launches three threads each of which is blocked by .start_consuming() which just stays there waiting for the rabbitmq queue to send them sometihing. Instead, this program starts, does some prints, and exits. The pattern of the exists is weird too:
launch thread
launch thread
start consuming
launch thread
start consuming
In particular notice there is one "start consuming" missing.
What's going on?
EDIT: One answer I found to a similar question is here Consuming a rabbitmq message queue with multiple threads (Python Kombu) and the answer is to "use celery", whatever that means. I don't buy it, I shouldn't need anything remotely as sophisticated as celery. In particular, I'm not trying to set up an RPC and I don't need to read replies from the do_stuff routines.
EDIT 2: The print pattern that I expected would be the following. I do
python send.py first message......
python send.py second message.
python send.py third message.
python send.py fourth message.
and the print pattern would be
launch thread
start consuming
[x] received 'first message......'
launch thread
start consuming
[x] received 'second message.'
launch thread
start consuming
[x] received 'third message.'
[x] received 'fourth message.'