I'm currently learn celery and try to build a DAG like data processing. My Idea was to create a pipeline with celery canvas this pipeline should contains task that are done to a list of all objects or applied to one object and applied distributed. I implemented a dataclass that will contain my object and some dummy tasks just to try the pipeline architecture. I'm using a docker container that runs redis for me without any extra configuration. I also wrote a custom JSON En/De-Coder for the dataclass. I know that the example tasks do not make sense it's just for presenting an mvp of my problem.
The dataclass:
@dataclass
class Car:
car_id:int
color:str
tires:str
doors:int
The Json De/Encoder for the dataclass:
## https://stackoverflow.com/questions/43092113/create-a-class-that-support-json-serialization-for-use-with-celery
import json
import collections
import six
def is_iterable(arg):
return isinstance(arg, collections.Iterable) and not isinstance(arg, six.string_types)
class GenericJSONEncoder(json.JSONEncoder):
def default(self, obj):
try:
return super().default(obj)
except TypeError:
pass
cls = type(obj)
result = {
'__custom__': True,
'__module__': cls.__module__,
'__name__': cls.__name__,
'data': obj.__dict__ if not hasattr(cls, '__json_encode__') else obj.__json_encode__
}
return result
class GenericJSONDecoder(json.JSONDecoder):
def decode(self, str):
result = super().decode(str)
return GenericJSONDecoder.instantiate_object(result)
@staticmethod
def instantiate_object(result):
if not isinstance(result, dict): # or
if is_iterable(result):
return [GenericJSONDecoder.instantiate_object(v) for v in result]
else:
return result
if not result.get('__custom__', False):
return {k: GenericJSONDecoder.instantiate_object(v) for k, v in result.items()}
import sys
module = result['__module__']
if module not in sys.modules:
__import__(module)
cls = getattr(sys.modules[module], result['__name__'])
if hasattr(cls, '__json_decode__'):
return cls.__json_decode__(result['data'])
instance = cls.__new__(cls)
data = {k: GenericJSONDecoder.instantiate_object(v) for k, v in result['data'].items()}
instance.__dict__.update(data)
return instance
def dumps(obj, *args, **kwargs):
return json.dumps(obj, *args, cls=GenericJSONEncoder, **kwargs)
def loads(obj, *args, **kwargs):
return json.loads(obj, *args, cls=GenericJSONDecoder, **kwargs)
My tasks:
@app.task
def get_cars_from_db():
return [Car(car_id=1,color=None,tires=None,doors=2),Car(car_id=2,color=None,tires=None,doors=4),Car(car_id=3,color=None,tires=None,doors=4),Car(car_id=1,color=None,tires=None,doors=4)]
@app.task
def paint_car(car:Car):
car.color = "blue"
return car
@app.task
def filter_out_two_door(car:Car):
if car.doors==2:
return None
return car
@app.task
def filter_none(cars:[Car]):
return [c for c in car if c]
@app.task
def change_tires(car:Car):
car.tires = "winter"
return car
@app.task
def write_back_whatever(cars:[Car]):
print(cars)
@app.task
def dmap(args_iter, celery_task):
"""
Takes an iterator of argument tuples and queues them up for celery to run with the function.
"""
print(args_iter)
print(celery_task)
return group(celery_task(arg) for arg in args_iter)
My celery configuration:
from celery import Celery,subtask,group
from kombu.serialization import register, registry
from utils.json_encoders import dumps, loads
register("pipelineJSON",dumps,loads,content_type='application/x-pipelineJSON',content_encoding="utf-8")
registry.enable('pipelineJSON')
app = Celery('pipeline', broker='redis://localhost:6379/0',backend='redis://localhost:6379/0')
app.conf["accept_content"]=["application/x-pipelineJSON","pipelineJSON"]
app.conf["result_serializer"]="pipelineJSON"
app.conf["task_serializer"]="pipelineJSON"
Now i try to build and execute the following workflow:
paint_and_filter = paint_car.s() | filter_out_two_door.s()
workflow = get_cars_from_db.s() | dmap.s(paint_and_filter) | filter_none.s() |
dmap.s(change_tires.s()) | write_back_whatever.s()
workflow.get()
My problem is that I'm not able to pass the list result from the get from db task to the other chain. I read up on stackoverflow and github and stumbled across the dmap
, but did not manage to get this working. With the example code i provided the worker throws the following execption:
return group(celery_task(arg) for arg in args_iter)
TypeError: 'dict' object is not callable
I also tried to wrap celery_task(arg)
into a subtask as follows:
return group(subtask(celery_task)(arg) for arg in args_iter)
which creates the following error on the worker:
File "/Users/utils/json_encoders.py", line 23, in default
'data': obj.__dict__ if not hasattr(cls, '__json_encode__') else obj.__json_encode__
kombu.exceptions.EncodeError: 'mappingproxy' object has no attribute '__dict__'
I tried to draw a picture of what I'm going to archive:
I'm using Celery 5.02 and Python 3.8.3.
I would be very thankful if somebody could help me here out. How can I get this dmap
working? Is there any other or better solution for what I'm trying to archive? Thanks in advance.