Turned out that there is an experimental feature (hopefully will become official) that allows for tasks to be created based on elements of an iterable output. The working code below:
from dagster import execute_pipeline, pipeline, solid, Output, OutputDefinition
from dagster.experimental import DynamicOutput, DynamicOutputDefinition
from typing import List
@solid
def return_some_list(context):
return [1, 2, 3, 4, 5]
@solid(output_defs=[DynamicOutputDefinition(int)])
def generate_subtasks(context, nums: List[int]):
context.log.info(str(nums))
for num in nums:
yield DynamicOutput(num, mapping_key=f'subtask_{num}')
@solid
def print_num(context, some_num: int):
context.log.info(str(some_num))
return some_num
@pipeline
def some_pipeline():
output_list = return_some_list()
generate_subtasks(output_list).map(print_num)
if __name__ == "__main__":
result = execute_pipeline(some_pipeline)
Here, return_some_list
returns an iterable. We want to run a solid for each element of this iterable. We do this in the solid generate_subtasks
, which yields a DynamicOutput
with the element and a name for the subtask that will be generated for it. The DynamicOutput
's type information is given in the DynamicOutputDefinition
in the solid
specification.
To connect these solids, we first get the list via return_some_list
. Then call generate_subtasks
, which is a generator, and map
to each of its output the print_num
function.
Running the whole pipeline should print a whole lot of information for each of the subtasks generated by generate_subtasks
, looking like this (only part of the output shown):
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_4] - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_4] - HANDLED_OUTPUT - Handled output "result" using output manager "io_manager"
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_4] - STEP_SUCCESS - Finished execution of step "print_num[subtask_4]" in 2.1ms.
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_START - Started execution of step "print_num[subtask_5]".
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - LOADED_INPUT - Loaded input "some_num" using input manager "io_manager", from output "result" of step "test"
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_INPUT - Got input "some_num" of type "Int". (Type check passed).
2021-03-13 21:27:53 - dagster - INFO - system - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - print_num[subtask_5] - 5
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - HANDLED_OUTPUT - Handled output "result" using output manager "io_manager"
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_SUCCESS - Finished execution of step "print_num[subtask_5]" in 1.98ms.
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - ENGINE_EVENT - Finished steps in process (pid: 33738) in 44ms
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - PIPELINE_SUCCESS - Finished execution of pipeline "some_pipeline".
Oh, and the cool thing: Dagster performs type checking and fails fast if you give it a wrongly typed argument. So, if we were to supply print_str
, say, to the map
function, it would refuse to even run.