14

Background

I'm looking into using celery (3.1.8) to process huge text files (~30GB) each. These files are in fastq format and contain about 118M sequencing "reads", which are essentially each a combination of header, DNA sequence, and quality string). Also, these sequences are from a paired-end sequencing run, so I'm iterating two files simultaneously (via itertools.izip). What I'd like to be able to do is take each pair of reads, send them to a queue, and have them be processed on one of the machines in our cluster (don't care which) to return a cleaned-up version of the read, if cleaning needs to happen (e.g., based on quality).

I've set up celery and rabbitmq, and my workers are launched as follows:

celery worker -A tasks --autoreload -Q transient 

and configured like:

from kombu import Queue

BROKER_URL = 'amqp://guest@godel97'
CELERY_RESULT_BACKEND = 'rpc'
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT=['pickle', 'json']
CELERY_TIMEZONE = 'America/New York'
CELERY_ENABLE_UTC = True
CELERYD_PREFETCH_MULTIPLIER = 500

CELERY_QUEUES = (
    Queue('celery', routing_key='celery'),
    Queue('transient', routing_key='transient',delivery_mode=1),
)

I've chosen to use an rpc backend and pickle serialization for performance, as well as not writing anything to disk in the 'transient' queue (via delivery_mode).

Celery startup

To set up the celery framework, I first launch the rabbitmq server (3.2.3, Erlang R16B03-1) on a 64-way box, writing log files to a fast /tmp disk. Worker processes (as above) are launched on each node on the cluster (about 34 of them) ranging anywhere from 8-way to 64-way SMP for a total of 688 cores. So, I have a ton of available CPUs for the workers to use to process of the queue.

Job submission/performance

Once celery is up and running, I submit the jobs via an ipython notebook as below:

files = [foo, bar]
f1 = open(files[0])
f2 = open(files[1])
res = []
count = 0
for r1, r2 in izip(FastqGeneralIterator(f1), FastqGeneralIterator(f2)):
    count += 1
    res.append(tasks.process_read_pair.s(r1, r2))
        if count == 10000:
        break
t.stop()
g = group(res)
for task in g.tasks:
    task.set(queue="transient")

This takes about a 1.5s for 10000 pairs of reads. Then, I call delay on the group to submit to the workers, which takes about 20s, as below:

result = g.delay()

Monitoring with rabbitmq console, I see that I'm doing OK, but not nearly fast enough.

rabbitmq graph

Question

So, is there any way to speed this up? I mean, I'd like to see at least 50,000 read pairs processed every second rather than 500. Is there anything obvious that I'm missing in my celery configuration? My worker and rabbit logs are essentially empty. Would love some advice on how to get my performance up. Each individual read pair processes pretty quickly, too:

[2014-01-29 13:13:06,352: INFO/Worker-1] tasks.process_read_pair[95ec7f2f-0143-455a-a23b-c032998951b8]: HWI-ST425:143:C04A5ACXX:3:1101:13938:2894 1:N:0:ACAGTG HWI-ST425:143:C04A5ACXX:3:1101:13938:2894 2:N:0:ACAGTG 0.00840497016907 sec

Up to this point

So up to this point, I've googled all I can think of with celery, performance, routing, rabbitmq, etc. I've been through the celery website and docs. If I can't get the performance higher, I'll have to abandon this method in favor of another solution (basically dividing up the work into many smaller physical files and processing them directly on each compute node with multiprocessing or something). It would be a shame to not be able to spread this load out over the cluster, though. Plus, this seems like an exquisitely elegant solution.

Thanks in advance for any help!

Chris F.
  • 773
  • 6
  • 15
  • I've used rabbitmq and found that one of my performance bottlenecks came from the act of closing/reopening the connection to the queue for each message. Once I started reusing the connection, publish rate increased by 2 orders of magnitude. I'm not familiar with the library you're using but check that it reuses the connection – Basic Jan 29 '14 at 18:38
  • Yeah, celery has a [default broker pool](http://docs.celeryproject.org/en/latest/configuration.html#std:setting-BROKER_POOL_LIMIT). I'll try to increase it and see what happens. – Chris F. Jan 29 '14 at 18:43
  • Added `BROKER_POOL_LIMIT = 1000` and bounced my workers. Unfortunately, didn't make any difference. – Chris F. Jan 29 '14 at 18:49
  • Scraping the bottom of the barrel but... How large are your individual messages? Is it possible you're saturating your network connection? – Basic Jan 29 '14 at 19:29
  • r1 and r2 are about 80 bytes (per `sys.getsizeof`), since it's a tuple. Individually, the header, sequence, and quality are 90, 139, and 139 bytes, respectively. Net links are gigabit. – Chris F. Jan 29 '14 at 19:54
  • even if I do something like: `tasks.speed.apply_async(args=(i,), queue="transient")`, where speed() just returns what I send it, I can only get ~500/s. hmmm... – Chris F. Jan 29 '14 at 20:19
  • As yet another test, I fired up another worker on the same box as the rabbitmq broker, put it into its own queue, and ran the speed test from above again. Same results - just under 500/s! Ugh. – Chris F. Jan 29 '14 at 20:44
  • Is there any reason you want to use celery or rabbitmq specifically? – ionelmc Jan 31 '14 at 20:11
  • @ionelmc - other than it seeming like an elegant and performant solution, not really. I'm also looking at ways to make better use of our computational facility here. celerly (w/rabbit) seemed like a low barrier to entry. I've also experimented with other distributed caches/queues in the past, with mixed results (e.g., HazelCast). – Chris F. Jan 31 '14 at 21:00
  • @ChrisF. if you want to process large amounts of data then you want to minimize data copying and overhead. Eg: you could use zeromq (http://zguide.zeromq.org/page:all#Divide-and-Conquer) and just stream the binary data directly. I assume here that you don't need celery's flexibility and features - you just want to quickly process tons of data. – ionelmc Feb 01 '14 at 12:28

6 Answers6

2

Not an answer but too long for a comment.

Let's narrow the problem down a little...

Firstly, try skipping all your normal logic/message preparation and just do the tightest possible publishing loop with your current library. See what rate you get. This will identify if it's a problem with your non-queue-related code.

If it's still slow, set up a new python script but use amqplib instead of celery. I've managed to get it publishing at over 6000/s while doing useful work (and json encoding) on a mid-range desktop, so I know that it's performant. This will identify if the problem is with the celery library. (To save you time, I've snipped the following from a project of mine and hopefully not broken it when simplifying...)

from amqplib import client_0_8 as amqp
try:
    lConnection = amqp.Connection(
        host=###,
        userid=###,
        password=###,
        virtual_host=###,
        insist=False)
    lChannel = lConnection.channel()
    Exchange = ###

    for i in range(100000):
        lMessage = amqp.Message("~130 bytes of test data..........................................................................................................")
        lMessage.properties["delivery_mode"] = 2
        lChannel.basic_publish(lMessage, exchange=Exchange)

    lChannel.close()
    lConnection.close()

except Exception as e:
    #Fail

Between the two approaches above you should be able to track down the problem to one of the Queue, the Library or your code.

Basic
  • 26,321
  • 24
  • 115
  • 201
  • 1
    Thanks! good testing. Using your code, I'm getting over 7k/sec to the exchange, slightly less with delivery mode dropping to disk. [Mode 2](https://dl.dropboxusercontent.com/u/861789/Screen%20Shot%202014-01-30%20at%2012.36.52%20PM.png), [Mode 1](https://dl.dropboxusercontent.com/u/861789/Screen%20Shot%202014-01-30%20at%2012.35.35%20PM.png) – Chris F. Jan 30 '14 at 17:40
  • Ok, well that's a step in the right direction. At least we know the queue is working... I suppose the next easiest step would be to drop amqplib into your original code and see what the speed there is like. If it's high, we'll call that a win. If not, it proves that the problem is nothing to do with the queues and is instead with your code - perhaps the structure you're using to store the data is slow enumerating or something similar. If it is your code, [profiling is the way to go](http://stackoverflow.com/a/582337/156755). It's not a great profiler but the best I've found in pythong so far. – Basic Jan 30 '14 at 17:57
2

Reusing the producer instance should give you some performance improvement:

with app.producer_or_acquire() as producer:
    task.apply_async(producer=producer)

Also the task may be a proxy object and if so must be evaluated for every invocation:

task = task._get_current_object()

Using group will automatically reuse the producer and is usually what you would do in a loop like this:

process_read_pair = tasks.process_read_pair.s
g = group(
    process_read_pair(r1, r2)
    for r1, r2 in islice(
        izip(FastGeneralIterator(f1), FastGeneralIterator(f2)), 0, 1000)
)
result = g.delay()

You can also consider installing the librabbitmq module which is written in C. The amqp:// transport will automatically use it if available (or can be specified manually using librabbitmq://:

pip install librabbitmq

Publishing messages directly using the underlying library may be faster since it will bypass the celery routing helpers and so on, but I would not think it was that much slower. If so there is definitely room for optimization in Celery, as I have mostly focused on optimizing the consumer side so far.

Note also that you may want to process multiple DNA pairs in the same task, as using coarser task granularity may be beneficial for CPU/memory caches and so on, and it will often saturate parallelization anyway since that is a finite resource.

NOTE: The transient queue should be durable=False

asksol
  • 19,129
  • 5
  • 61
  • 68
  • Note that the worker should be able to consume 50.000 tasks a second when using the solo pool and no results, though I haven't been benchmarking in some time so performance may have regressed a bit. When publishing it may also help to use multiple connections. – asksol Jan 31 '14 at 14:04
  • Thanks, some more stuff to try, for sure. I've been using the C client, which wasn't obvious from my code snippets. My current round of testing with using the redis backend instead of rpc and submitting using group in batches of 50k read pairs, allows me to submit at 5000-ish/sec, which while not awesome, is definitely better than before. – Chris F. Jan 31 '14 at 18:57
1

One solution you have is that the reads are highly compressible so replacing the following

res.append(tasks.process_read_pair.s(r1, r2))

by

res.append(tasks.process_bytes(zlib.compress(pickle.dumps((r1, r2))),
                                      protocol = pickle.HIGHEST_PROTOCOL),
                         level=1))

and calling a pickle.loads(zlib.decompress(obj)) on the other side.

It should win a factor around big factor for long enough DNA sequence if they are not long enough you can grouping them by chunk in an array which you dumps and compress.

another win can be to use zeroMQ for transport if you don't do yet.

I'm not sure what process_byte should be

Xavier Combelle
  • 10,968
  • 5
  • 28
  • 52
  • Just using zlib costs me about 100/s. Isn't list basically the same thing? `res.append(tasks.process_read_pair.apply_async(args=(r1, r2), queue="transient", compression='zlib'))` – Chris F. Jan 29 '14 at 18:37
  • Also not sure it would work the way you wrote it. Wouldn't you need to compress r1, r2, rather than compress the entire task? – Chris F. Jan 29 '14 at 18:40
  • I don't know that you tryied compression='zlib' You are right zlib is the same than level=9 however you can specify level=1 to have little impact on performance – Xavier Combelle Jan 29 '14 at 19:17
0

Again, not an answer, but too long for comments. Per Basic's comments/answer below, I set up the following test using the same exchange and routing as my application:

from amqplib import client_0_8 as amqp
try:
    lConnection = amqp.Connection()
    lChannel = lConnection.channel()
    Exchange = 'celery'

    for i in xrange(1000000):
        lMessage = amqp.Message("~130 bytes of test data..........................................................................................................")
        lMessage.properties["delivery_mode"] = 1
        lChannel.basic_publish(lMessage, exchange=Exchange, routing_key='transient')

    lChannel.close()
    lConnection.close()

except Exception as e:
    print e

You can see that it's rocking right along.

test

I guess now it's up to finding out the difference between this and what's going on inside of

Community
  • 1
  • 1
Chris F.
  • 773
  • 6
  • 15
0

I added amqp into my logic, and it's fast. FML.

from amqplib import client_0_8 as amqp
try:
    import stopwatch
    lConnection = amqp.Connection()
    lChannel = lConnection.channel()
    Exchange = 'celery'

    t = stopwatch.Timer()
    files = [foo, bar]
    f1 = open(files[0])
    f2 = open(files[1])
    res = []
    count = 0
    for r1, r2 in izip(FastqGeneralIterator(f1), FastqGeneralIterator(f2)):
        count += 1
        #res.append(tasks.process_read_pair.s(args=(r1, r2)))
        #lMessage = amqp.Message("~130 bytes of test data..........................................................................................................")
        lMessage = amqp.Message(" ".join(r1) + " ".join(r2))
        res.append(lMessage)
        lMessage.properties["delivery_mode"] = 1
        lChannel.basic_publish(lMessage, exchange=Exchange, routing_key='transient')
        if count == 1000000:
            break
    t.stop()
    print "added %d tasks in %s" % (count, t)

    lChannel.close()
    lConnection.close()

except Exception as e:
    print e

img

So, I made a change to submit an async task to celery in the loop, as below:

res.append(tasks.speed.apply_async(args=("FML",), queue="transient"))

The speed method is just this:

@app.task()
def speed(s):
    return s

Submitting the tasks I'm slow again!

img

So, it doesn't appear to have anything to do with:

  1. How I'm iterating to submit to the queue
  2. The message that I'm submitting

but rather, it has to do with the queueing of the function?!?! I'm confused.

Chris F.
  • 773
  • 6
  • 15
0

Again, not an answer, but more of an observation. By simply changing my backend from rpc to redis, I more than triple my throughput:

img

Chris F.
  • 773
  • 6
  • 15