4

I'd like to pass a function the client creates to a celery task and have it be executed. For example below, I'm trying to write a map function that takes in a function f and a list l and do map(f, l) in the celery task.

Presumably, the function is not getting serialized correctly (understandable, that's hard). But, is there any way to do this? What's the best practice? I suppose I could pass a string and then exec it, but I'd rather not just the way my app is intended to work.

edit: I found a way to serialize a function... I guess I could wrap that up to do what I need to do. Any better ideas?


from celery import Celery

app = Celery('tasks', broker='redis://localhost', backend='redis://localhost')

@app.task
def cp_map(f, l):
    return map(f, l)

Then, I try to use this task with this:

In [20]: from tasks import cp_map

In [21]: def f(x): return x + 1

In [22]: cp_map.delay(f, [1,2,3])
Out[22]: <AsyncResult: 27baf8bf-8ef3-496d-a445-ebd7ee94e206>

In [23]: _.status
Out[23]: 'PENDING'

On the worker, I get this:

[2014-02-09 22:27:00,828: CRITICAL/MainProcess] Can't decode message body: DecodeError(AttributeError("'module' object has no attribute 'f'",),) (type:u'application/x-python-serialize' encoding:u'binary' raw:"'\\x80\\x02}q\\x01(U\\x07expiresq\\x02NU\\x03utcq\\x03\\x88U\\x04argsq\\x04c__main__\\nf\\nq\\x05]q\\x06(K\\x01K\\x02K\\x03e\\x86q\\x07U\\x05chordq\\x08NU\\tcallbacksq\\tNU\\x08errbacksq\\nNU\\x07tasksetq\\x0bNU\\x02idq\\x0cU$27baf8bf-8ef3-496d-a445-ebd7ee94e206q\\rU\\x07retriesq\\x0eK\\x00U\\x04taskq\\x0fU\\x0ctasks.cp_mapq\\x10U\\ttimelimitq\\x11NN\\x86U\\x03etaq\\x12NU\\x06kwargsq\\x13}q\\x14u.' (233b)"')
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 585, in _receive_callback
    decoded = None if on_m else message.decode()
  File "/usr/local/lib/python2.7/dist-packages/kombu/message.py", line 142, in decode
    self.content_encoding, accept=self.accept)
  File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 184, in loads
    return decode(data)
  File "/usr/lib/python2.7/contextlib.py", line 35, in __exit__
    self.gen.throw(type, value, traceback)
  File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 59, in _reraise_errors
    reraise(wrapper, wrapper(exc), sys.exc_info()[2])
  File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 55, in _reraise_errors
    yield
  File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 184, in loads
    return decode(data)
  File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 64, in pickle_loads
    return load(BytesIO(s))
DecodeError: 'module' object has no attribute 'f'
Community
  • 1
  • 1
Donald Miner
  • 38,889
  • 8
  • 95
  • 118

3 Answers3

3

You can use marshal to serialize the function to a string then deserialize it in the task. I don't know if it's the best way, but but it will work. You may also want to look at dill.

Here is some example code copied from another stackoverflow answer:

import marshal
def foo(x): return x*x
code_string = marshal.dumps(foo.func_code)

Then in the task:

import marshal, types

code = marshal.loads(code_string)
func = types.FunctionType(code, globals(), "some_func_name")

func(10)  # gives 100
Community
  • 1
  • 1
joshua
  • 2,509
  • 17
  • 19
  • The 'cloud' library on PyPI contains utilities to serialize a local function and is reportedly working very well. – asksol Feb 17 '14 at 16:24
  • This solution does not work for me. I instead get an error: `kombu.exceptions.EncodeError: 'utf-8' codec can't decode byte 0xe3 in position 0: invalid continuation byte` – Harris Irfan May 08 '21 at 19:16
1

Sometimes you may encounter an issue with encoding complex functions. You can either ignore or replace errors during encoding.

__ENCODING_FORMAT = 'ISO-8859-1'  # ISO-8859-1 is a work-around to avoid UnicodeDecodeError on 'byte 0x83'

def _serialize_func(func):
    """ converts func into code string and encodes it with ISO-8859-1 """
    return unicode(marshal.dumps(func.func_code), encoding=__ENCODING_FORMAT, errors='replace')


def _load_func(code_string):
    """ loads func from code string, decodes from unicode to str"""
    code = marshal.loads(code_string.encode(__ENCODING_FORMAT))
    return types.FunctionType(code, globals())
NIkita Koren
  • 157
  • 1
  • 9
0

This type of error will also occur if the function you are referencing sits in a file with a __main__ directive, ie: the file containing the function definition looks something like:

def f(*args):
    ... some code here ...

if __name__ == "__main__":
    ... some code here ...

If this is the case, having the function definition sit in a file separate from the "__main__" referencing code should solve the problem.

Assuming this simple refactoring is indeed applicable to your use case, it is much simpler than the above marshal jiujitsu.

David Simic
  • 2,061
  • 2
  • 19
  • 33