0

I have a running celery server using redis as the borker and result storage (python3). I would like to have an arbitrary function, which has not been registered to the server, being executed by a celery worker. I tried to serialize this function using the package marshal (following Is there an easy way to pickle a python function (or otherwise serialize its code)?) and transfered the bytecode to a worker:

celery_server.py:

from celery import Celery
import types
import marshal

app = Celery('tasks', broker='redis://guest@localhost//', backend='redis://localhost')

@app.task
def run_fct( fct_code, args, kwargs ):
    code = marshal.loads( fct.__code__ )
    func = types.FunctionType(code, globals(), "some_func_name" )

    return fct( *args, **kwargs )

client.py

from celery_server import run_fct
import marshal

def calc( x, y ):
    return x*y

fct_code = marshal.dumps( calc.func_code )
run_fct.apply_async( (fct_code, 10, 2 ) )

I get the following error on the client side:

UnicodeDecodeError: 'utf-8' codec can't decode byte 0xe3 in position 0: invalid continuation byte

in the function bytes_to_str in kombu.utils.encoding.py.

Is there another or better way to have my function being executed?

Thank you for any help.

Community
  • 1
  • 1
desiato
  • 1,122
  • 1
  • 9
  • 16

1 Answers1

0

I found a solution: By wrapping my function and giving the wrapped function to the celery task as an argument solves the UnicodeDecodeError:

celery_server.py:

class MyFunction( object ):
    def __init__(self, fct):
        self.fct_code = marshal.dumps( fct.__code__ )

    def run(self, *args, **kwargs):
        run_fct.apply_async( args=(self,)+args, kwargs=kwargs, serializer='pickle' )

and in client.py:

from celery_server import MyFunction

myFct = MyFunction( calc )
myFct.run( x=10, y=2 )

then it works like a charm.

However, using the package cloud solves also dependency problems of my function on the worker side, for instance, if I used additional functions or packages in my function:

from celery import Celery
import cloud
import pickle

app = Celery('tasks', broker='redis://guest@localhost//', backend='redis://localhost')

class MyFunction( object ):
    def __init__(self, fct):
        self.serialized_code = cloud.serialization.cloudpickle.dumps(fct)

    def run(self, *args, **kwargs):
        run_fct.apply_async( args=(self,)+args, kwargs=kwargs, serializer='pickle' )

@app.task
def run_fct( myFct, *args, **kwargs ):
    fct = pickle.loads( myFct.serialized_code )

    return fct( *args, **kwargs )

client.py:

from tasks import MyFunction
import time

def calc( x, y ):
    time.sleep(5)
    return x*y

myFct = MyFunction( calc )
myFct.run( x=10, y=2 )

BUT: the current cloud does not support python3.

desiato
  • 1,122
  • 1
  • 9
  • 16