5

I have a Dagster pipeline consisting of two solids (reproducible example below). The first (return_some_list) outputs a list of some objects. The second solid (print_num) accepts an element from the first list (not the full list) and does some processing on that element.

How am I supposed to call the second solid for each element of the list returned by the first solid? Please explain any bests practices as well.

Not sure if this is the best approach (let me know), but I'd like to produce a different solid instance of print_num for each element of the first solid's output. This will help me parallelize the solid in the future and handle long / compute-intensive solids better.

from dagster import execute_pipeline, pipeline, solid

@solid
def return_some_list(context):
    return [1,2,3,4,5]

@solid
def print_num(context, some_num: int):
    print(some_num)
    return some_num


@pipeline
def some_pipeline():
    output_list = return_some_list()
    for some_num in output_list:
        print_num(some_num)

if __name__ == "__main__":
    result = execute_pipeline(some_pipeline)
mit
  • 11,083
  • 11
  • 50
  • 74
cyau
  • 449
  • 4
  • 14

2 Answers2

6

Turned out that there is an experimental feature (hopefully will become official) that allows for tasks to be created based on elements of an iterable output. The working code below:

from dagster import execute_pipeline, pipeline, solid, Output, OutputDefinition
from dagster.experimental import DynamicOutput, DynamicOutputDefinition
from typing import List


@solid
def return_some_list(context):
    return [1, 2, 3, 4, 5]


@solid(output_defs=[DynamicOutputDefinition(int)])
def generate_subtasks(context, nums: List[int]):
    context.log.info(str(nums))
    for num in nums:
        yield DynamicOutput(num, mapping_key=f'subtask_{num}')


@solid
def print_num(context, some_num: int):
    context.log.info(str(some_num))
    return some_num


@pipeline
def some_pipeline():
    output_list = return_some_list()
    generate_subtasks(output_list).map(print_num)


if __name__ == "__main__":
    result = execute_pipeline(some_pipeline)

Here, return_some_list returns an iterable. We want to run a solid for each element of this iterable. We do this in the solid generate_subtasks, which yields a DynamicOutput with the element and a name for the subtask that will be generated for it. The DynamicOutput's type information is given in the DynamicOutputDefinition in the solid specification.

To connect these solids, we first get the list via return_some_list. Then call generate_subtasks, which is a generator, and map to each of its output the print_num function.

Running the whole pipeline should print a whole lot of information for each of the subtasks generated by generate_subtasks, looking like this (only part of the output shown):

2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_4] - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_4] - HANDLED_OUTPUT - Handled output "result" using output manager "io_manager"
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_4] - STEP_SUCCESS - Finished execution of step "print_num[subtask_4]" in 2.1ms.
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_START - Started execution of step "print_num[subtask_5]".
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - LOADED_INPUT - Loaded input "some_num" using input manager "io_manager", from output "result" of step "test"
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_INPUT - Got input "some_num" of type "Int". (Type check passed).
2021-03-13 21:27:53 - dagster - INFO - system - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - print_num[subtask_5] - 5
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - HANDLED_OUTPUT - Handled output "result" using output manager "io_manager"
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_SUCCESS - Finished execution of step "print_num[subtask_5]" in 1.98ms.
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - ENGINE_EVENT - Finished steps in process (pid: 33738) in 44ms
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - PIPELINE_SUCCESS - Finished execution of pipeline "some_pipeline".

Oh, and the cool thing: Dagster performs type checking and fails fast if you give it a wrongly typed argument. So, if we were to supply print_str, say, to the map function, it would refuse to even run.

cyau
  • 449
  • 4
  • 14
  • This really helps and solves what I am looking for, but It makes the solids run **really slow**. It also needs to `.map(print_num)` **map all the values first, to start the next solid**. I am gonna try the solid factory approach and hope that makes it faster. – metinsenturk Apr 13 '21 at 20:35
  • @metinsenturk, if it works, please share your results as an answer :) – cyau Apr 14 '21 at 08:30
  • @cyau which version of dagster are you using? – Shameel Faraz Nov 22 '22 at 13:21
  • @ShameelFaraz, quite old - probably around 0.10. A lot has changed since... – cyau Nov 22 '22 at 15:14
2

Update in 2022 (a year later):

Some dagster naming has changed, solid is now op and a pipeline is called a job.

I introduced a configuration config_parallel, where you can fine tune the number of concurrent processes.

I tried to use the same naming more or less. My import style is different, but you get the idea.

import dagster

config_parallel = {
    'execution': {
        'config': {
            'multiprocess': {
                'max_concurrent': 4,
            },
        },
    },
}

@dagster.op
def return_some_list():
    return [1,2,3,4,5]

from typing import List
@dagster.op(
    output_defs = [
        dagster.DynamicOutputDefinition(int),
    ],
)
def generate_subtasks(context, nums: List[int]):
    context.log.info(str(nums))
    for num in nums:
        yield dagster.DynamicOutput(
            num, mapping_key=f'subtask_{num}'
        )

@dagster.op
def print_num_and_heavy_lifting(context, data: int):
    seconds = 5
    context.log.info(
        f"Data: {data}. Will sleep for {seconds} seconds to simulate some work."
    )
    import time
    time.sleep(seconds)


@dagster.job(config = config_parallel)
def some_job():
    '''
    use 'map', 'for' does not work.
    '''
    output_list = return_some_list()
    generate_subtasks(output_list).map(print_num_and_heavy_lifting)


if __name__ == "__main__":
    from dagster import execute_in_process
    result = my_job.execute_in_process()    

In this photo from dagit you can see, there are actually 4 of the 'heavy lifting' processes executed at the same time, like configured. The 5th will appear below them, as soon as one of them is finished.

enter image description here

mit
  • 11,083
  • 11
  • 50
  • 74