7

I have a directed acyclic graph in networkx. Each node represents a task and a nodes' predecessors are task dependencies (a given task cannot execute until its' dependencies have executed).

I'd like to 'execute' the graph in an asynchronous task queue, similar to what celery offers (so that I can poll jobs for their status, retrieve results etc). Celery doesnt offer the ability to create DAG's (as far as I know) and having the ability to move on to a task as soon as all dependencies are complete would be crucial (a DAG may have multiple paths and even if one task is slow/blocking, it may be possible to move on to other tasks etc).

Are there any simple examples as to how I could achieve this, or perhaps even integrate networkx with celery?

denfromufa
  • 5,610
  • 13
  • 81
  • 138
jramm
  • 6,415
  • 4
  • 34
  • 73
  • 2
    what you may be searching is called dask: http://dask.pydata.org/en/latest/custom-graphs.html?highlight=graph – denfromufa Apr 19 '16 at 00:59
  • Did you find a solution to this? I am trying to do something where I read in a graph that has been pickled into the db, and then unpickle it within my celery task, but it returns me no data. However without celery it works. – mp252 Mar 04 '22 at 17:15

3 Answers3

1

One library that you could use for this is taskgraph. It allows you to define a graph of tasks, and then executes these in a multi-thread/multi-process way. It avoids re-running tasks whose results are already up-to-date, similar to the make program.

To execute your networkx graph, you would iterate all the nodes in topological order, collect the imemdiate dependencies for each, and call task_graph.add_task. This function will return a handle to the newly added task, which lets you use it as dependency for subsequently added tasks (this is why the node iteration order is important)

For alternative solutions, see also this question.

Martin Cejp
  • 607
  • 6
  • 13
1

I'm a bit late to the party, but one possibility is to use dask for constructing custom DAGs and then executing them, see https://docs.dask.org/en/stable/graphs.html.

SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46
-1

I think this function may help:

  # The graph G is represened by a dictionnary following this pattern:
  # G = { vertex: [ (successor1: weight1), (successor2: weight2),...   ]  }
  def progress ( G, start ):
     Q = [ start ] # contain tasks to execute
     done = [ ]    # contain executed tasks
     while len (Q) > 0: # still there tasks to execute ?
        task = Q.pop(0) # pick up the oldest one 
        ready = True
        for T in G:     # make sure all predecessors are executed
           for S, w in G[T]:
              if S == task and and S not in done:# found not executed predecessor 
                 ready = False
                 break
           if not ready : break
        if not ready:
           Q.appen(task) # the task is not ready for execution
        else:
           execute(task)
           done.appen(task) # execute the task
           for S, w in G[task]:# and explore all its successors
              Q.append(S)
nino_701
  • 672
  • 3
  • 13