I'd like to pass a function the client creates to a celery task and have it be executed. For example below, I'm trying to write a map
function that takes in a function f
and a list l
and do map(f, l)
in the celery task.
Presumably, the function is not getting serialized correctly (understandable, that's hard). But, is there any way to do this? What's the best practice? I suppose I could pass a string and then exec
it, but I'd rather not just the way my app is intended to work.
edit: I found a way to serialize a function... I guess I could wrap that up to do what I need to do. Any better ideas?
from celery import Celery
app = Celery('tasks', broker='redis://localhost', backend='redis://localhost')
@app.task
def cp_map(f, l):
return map(f, l)
Then, I try to use this task with this:
In [20]: from tasks import cp_map
In [21]: def f(x): return x + 1
In [22]: cp_map.delay(f, [1,2,3])
Out[22]: <AsyncResult: 27baf8bf-8ef3-496d-a445-ebd7ee94e206>
In [23]: _.status
Out[23]: 'PENDING'
On the worker, I get this:
[2014-02-09 22:27:00,828: CRITICAL/MainProcess] Can't decode message body: DecodeError(AttributeError("'module' object has no attribute 'f'",),) (type:u'application/x-python-serialize' encoding:u'binary' raw:"'\\x80\\x02}q\\x01(U\\x07expiresq\\x02NU\\x03utcq\\x03\\x88U\\x04argsq\\x04c__main__\\nf\\nq\\x05]q\\x06(K\\x01K\\x02K\\x03e\\x86q\\x07U\\x05chordq\\x08NU\\tcallbacksq\\tNU\\x08errbacksq\\nNU\\x07tasksetq\\x0bNU\\x02idq\\x0cU$27baf8bf-8ef3-496d-a445-ebd7ee94e206q\\rU\\x07retriesq\\x0eK\\x00U\\x04taskq\\x0fU\\x0ctasks.cp_mapq\\x10U\\ttimelimitq\\x11NN\\x86U\\x03etaq\\x12NU\\x06kwargsq\\x13}q\\x14u.' (233b)"')
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 585, in _receive_callback
decoded = None if on_m else message.decode()
File "/usr/local/lib/python2.7/dist-packages/kombu/message.py", line 142, in decode
self.content_encoding, accept=self.accept)
File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 184, in loads
return decode(data)
File "/usr/lib/python2.7/contextlib.py", line 35, in __exit__
self.gen.throw(type, value, traceback)
File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 59, in _reraise_errors
reraise(wrapper, wrapper(exc), sys.exc_info()[2])
File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 55, in _reraise_errors
yield
File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 184, in loads
return decode(data)
File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 64, in pickle_loads
return load(BytesIO(s))
DecodeError: 'module' object has no attribute 'f'