4

I'm currently learn celery and try to build a DAG like data processing. My Idea was to create a pipeline with celery canvas this pipeline should contains task that are done to a list of all objects or applied to one object and applied distributed. I implemented a dataclass that will contain my object and some dummy tasks just to try the pipeline architecture. I'm using a docker container that runs redis for me without any extra configuration. I also wrote a custom JSON En/De-Coder for the dataclass. I know that the example tasks do not make sense it's just for presenting an mvp of my problem.

The dataclass:

@dataclass
class Car:
    car_id:int
    color:str
    tires:str
    doors:int

The Json De/Encoder for the dataclass:

## https://stackoverflow.com/questions/43092113/create-a-class-that-support-json-serialization-for-use-with-celery
import json
import collections
import six

def is_iterable(arg):
    return isinstance(arg, collections.Iterable) and not isinstance(arg, six.string_types)


class GenericJSONEncoder(json.JSONEncoder):
    def default(self, obj):
        try:
            return super().default(obj)
        except TypeError:
            pass
        cls = type(obj)
        result = {
            '__custom__': True,
            '__module__': cls.__module__,
            '__name__': cls.__name__,
            'data': obj.__dict__ if not hasattr(cls, '__json_encode__') else obj.__json_encode__
        }
        return result


class GenericJSONDecoder(json.JSONDecoder):
    def decode(self, str):
        result = super().decode(str)
        return GenericJSONDecoder.instantiate_object(result)

    @staticmethod
    def instantiate_object(result):
        if not isinstance(result, dict):  # or
            if is_iterable(result):
                return [GenericJSONDecoder.instantiate_object(v) for v in result]
            else:
                return result

        if not result.get('__custom__', False):
            return {k: GenericJSONDecoder.instantiate_object(v) for k, v in result.items()}

        import sys
        module = result['__module__']
        if module not in sys.modules:
            __import__(module)
        cls = getattr(sys.modules[module], result['__name__'])
        if hasattr(cls, '__json_decode__'):
            return cls.__json_decode__(result['data'])
        instance = cls.__new__(cls)
        data = {k: GenericJSONDecoder.instantiate_object(v) for k, v in result['data'].items()}
        instance.__dict__.update(data)
        return instance


def dumps(obj, *args, **kwargs):
    return json.dumps(obj, *args, cls=GenericJSONEncoder, **kwargs)


def loads(obj, *args, **kwargs):
    return json.loads(obj, *args, cls=GenericJSONDecoder, **kwargs)

My tasks:

@app.task
def get_cars_from_db():
    return [Car(car_id=1,color=None,tires=None,doors=2),Car(car_id=2,color=None,tires=None,doors=4),Car(car_id=3,color=None,tires=None,doors=4),Car(car_id=1,color=None,tires=None,doors=4)]

@app.task
def paint_car(car:Car):
    car.color = "blue"
    return car


@app.task
def filter_out_two_door(car:Car):
   if car.doors==2:
      return None
   return car


@app.task
def filter_none(cars:[Car]):
    return [c for c in car if c]


@app.task
def change_tires(car:Car):
    car.tires = "winter"
    return car

@app.task
def write_back_whatever(cars:[Car]):
    print(cars)

@app.task
def dmap(args_iter, celery_task):
    """
    Takes an iterator of argument tuples and queues them up for celery to run with the function.
    """
    print(args_iter)
    print(celery_task)
    return group(celery_task(arg) for arg in args_iter)

My celery configuration:

from celery import Celery,subtask,group
from kombu.serialization import register, registry
from utils.json_encoders import dumps, loads
register("pipelineJSON",dumps,loads,content_type='application/x-pipelineJSON',content_encoding="utf-8")
registry.enable('pipelineJSON')
app = Celery('pipeline', broker='redis://localhost:6379/0',backend='redis://localhost:6379/0')
app.conf["accept_content"]=["application/x-pipelineJSON","pipelineJSON"]
app.conf["result_serializer"]="pipelineJSON"
app.conf["task_serializer"]="pipelineJSON"

Now i try to build and execute the following workflow:

paint_and_filter = paint_car.s() | filter_out_two_door.s()
workflow = get_cars_from_db.s() | dmap.s(paint_and_filter) | filter_none.s() | 
dmap.s(change_tires.s()) | write_back_whatever.s()
workflow.get()

My problem is that I'm not able to pass the list result from the get from db task to the other chain. I read up on stackoverflow and github and stumbled across the dmap, but did not manage to get this working. With the example code i provided the worker throws the following execption:

       return group(celery_task(arg) for arg in args_iter)
       TypeError: 'dict' object is not callable

I also tried to wrap celery_task(arg) into a subtask as follows:

return group(subtask(celery_task)(arg) for arg in args_iter)

which creates the following error on the worker:

  File "/Users/utils/json_encoders.py", line 23, in default
  'data': obj.__dict__ if not hasattr(cls, '__json_encode__') else obj.__json_encode__
   kombu.exceptions.EncodeError: 'mappingproxy' object has no attribute '__dict__'

I tried to draw a picture of what I'm going to archive: This is bascically the workflow I'm trying to do

I'm using Celery 5.02 and Python 3.8.3.

I would be very thankful if somebody could help me here out. How can I get this dmap working? Is there any other or better solution for what I'm trying to archive? Thanks in advance.

Bierbarbar
  • 1,399
  • 15
  • 35

0 Answers0