Background
I'm looking into using celery (3.1.8) to process huge text files (~30GB) each. These files are in fastq format and contain about 118M sequencing "reads", which are essentially each a combination of header, DNA sequence, and quality string). Also, these sequences are from a paired-end sequencing run, so I'm iterating two files simultaneously (via itertools.izip). What I'd like to be able to do is take each pair of reads, send them to a queue, and have them be processed on one of the machines in our cluster (don't care which) to return a cleaned-up version of the read, if cleaning needs to happen (e.g., based on quality).
I've set up celery and rabbitmq, and my workers are launched as follows:
celery worker -A tasks --autoreload -Q transient
and configured like:
from kombu import Queue
BROKER_URL = 'amqp://guest@godel97'
CELERY_RESULT_BACKEND = 'rpc'
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT=['pickle', 'json']
CELERY_TIMEZONE = 'America/New York'
CELERY_ENABLE_UTC = True
CELERYD_PREFETCH_MULTIPLIER = 500
CELERY_QUEUES = (
Queue('celery', routing_key='celery'),
Queue('transient', routing_key='transient',delivery_mode=1),
)
I've chosen to use an rpc backend and pickle serialization for performance, as well as not writing anything to disk in the 'transient' queue (via delivery_mode).
Celery startup
To set up the celery framework, I first launch the rabbitmq server (3.2.3, Erlang R16B03-1) on a 64-way box, writing log files to a fast /tmp disk. Worker processes (as above) are launched on each node on the cluster (about 34 of them) ranging anywhere from 8-way to 64-way SMP for a total of 688 cores. So, I have a ton of available CPUs for the workers to use to process of the queue.
Job submission/performance
Once celery is up and running, I submit the jobs via an ipython notebook as below:
files = [foo, bar]
f1 = open(files[0])
f2 = open(files[1])
res = []
count = 0
for r1, r2 in izip(FastqGeneralIterator(f1), FastqGeneralIterator(f2)):
count += 1
res.append(tasks.process_read_pair.s(r1, r2))
if count == 10000:
break
t.stop()
g = group(res)
for task in g.tasks:
task.set(queue="transient")
This takes about a 1.5s for 10000 pairs of reads. Then, I call delay on the group to submit to the workers, which takes about 20s, as below:
result = g.delay()
Monitoring with rabbitmq console, I see that I'm doing OK, but not nearly fast enough.
Question
So, is there any way to speed this up? I mean, I'd like to see at least 50,000 read pairs processed every second rather than 500. Is there anything obvious that I'm missing in my celery configuration? My worker and rabbit logs are essentially empty. Would love some advice on how to get my performance up. Each individual read pair processes pretty quickly, too:
[2014-01-29 13:13:06,352: INFO/Worker-1] tasks.process_read_pair[95ec7f2f-0143-455a-a23b-c032998951b8]: HWI-ST425:143:C04A5ACXX:3:1101:13938:2894 1:N:0:ACAGTG HWI-ST425:143:C04A5ACXX:3:1101:13938:2894 2:N:0:ACAGTG 0.00840497016907 sec
Up to this point
So up to this point, I've googled all I can think of with celery, performance, routing, rabbitmq, etc. I've been through the celery website and docs. If I can't get the performance higher, I'll have to abandon this method in favor of another solution (basically dividing up the work into many smaller physical files and processing them directly on each compute node with multiprocessing or something). It would be a shame to not be able to spread this load out over the cluster, though. Plus, this seems like an exquisitely elegant solution.
Thanks in advance for any help!