I want to pass a function as a parameter to a celery task. I found two similar questions in stackoverflow (1 and 2). I tried the solutions mentioned in the answers, and this is what I'm currently doing:
Inside caller module:
import marshal
def function_to_be_passed:
# ...
serialized_func_code = marshal.dumps(function_to_be_passed.__code__)
celery_task.delay(serialized_func_code)
And inside the celery task, I'm deserializing and calling the function:
import marshal, types
from celery.task import task
@task()
def celery_task(serialized_func_code):
code = marshal.loads(serialized_func_code)
func = types.FunctionType(code, globals(), "some_func_name")
# calling the deserialized function
func()
However, upon calling celery_task.delay(serialized_func_code)
, I get an error:
kombu.exceptions.EncodeError: 'utf-8' codec can't decode byte 0xe3 in position 0: invalid continuation byte
Sharing stack trace below:
File “...caller.py",
celery_task.delay(
File "/python3.8/site-packages/celery/app/task.py", line 426, in delay
return self.apply_async(args, kwargs)
File "/python3.8/site-packages/celery/app/task.py", line 555, in apply_async
content_type, content_encoding, data = serialization.dumps(
File "/python3.8/site-packages/kombu/serialization.py", line 221, in dumps
payload = encoder(data)
File "/python3.8/contextlib.py", line 131, in __exit__
self.gen.throw(type, value, traceback)
File "/python3.8/site-packages/kombu/serialization.py", line 54, in _reraise_errors
reraise(wrapper, wrapper(exc), sys.exc_info()[2])
File "/python3.8/site-packages/vine/five.py", line 194, in reraise
raise value.with_traceback(tb)
File "/python3.8/site-packages/kombu/serialization.py", line 50, in _reraise_errors
yield
File "/python3.8/site-packages/kombu/serialization.py", line 221, in dumps
payload = encoder(data)
File "/python3.8/site-packages/kombu/utils/json.py", line 69, in dumps
return _dumps(s, cls=cls or _default_encoder,
File "/python3.8/site-packages/simplejson/__init__.py", line 398, in dumps
return cls(
File "/python3.8/site-packages/simplejson/encoder.py", line 296, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/python3.8/site-packages/simplejson/encoder.py", line 378, in iterencode
return _iterencode(o, 0)
kombu.exceptions.EncodeError: 'utf-8' codec can't decode byte 0xe3 in position 0: invalid continuation byte
Similar error occurs when I use dumps
and loads
provided by pickle.
kombu.exceptions.EncodeError: 'utf-8' codec can't decode byte 0x80 in position 0: invalid start byte
I'm using Django version 2.2.17 and Python version 3.8.5.