You have a two main options for doing this:
- Use a single job with Dynamic Outputs:
With this setup, all of your ETLs would happen in a single job. You would have an initial op that would yield a DynamicOutput for each table name that you wanted to do this process for, and feed that into a set of ops (probably organized into a graph) that would be run on each individual DynamicOutput.
Depending on what executor you're using, it's possible to limit the overall step concurrency (for example, the default multiprocess_executor supports this option).
- Create a configurable job (I think this is more likely what you want)
from dagster import job, op, graph
import pandas as pd
@op(config_schema={"table_name": str})
def extract_table(context) -> pd.DataFrame:
table_name = context.op_config["table_name"]
# do some load...
return pd.DataFrame()
@op
def transform_table(table: pd.DataFrame) -> pd.DataFrame:
# do some transform...
return table
@op(config_schema={"table_name": str})
def load_table(context, table: pd.DataFrame):
table_name = context.op_config["table_name"]
# load to snowflake...
@job
def configurable_etl():
load_table(transform_table(extract_table()))
# this is what the configuration would look like to extract from table
# src_foo and load into table dest_foo
configurable_etl.execute_in_process(
run_config={
"ops": {
"extract_table": {"config": {"table_name": "src_foo"}},
"load_table": {"config": {"table_name": "dest_foo"}},
}
}
)
Here, you create a job that can be pointed at a source table and a destination table by giving the relevant ops a config schema. Depending on those config options, (which are provided when you create a run through the run config), your job will operate on different source / destination tables.
The example shows explicitly running this job using python APIs, but if you're running it from Dagit, you'll also be able to input the yaml version of this config there. If you want to simplify the config schema (as it's pretty nested as shown), you can always create a Config Mapping to make the interface nicer :)
From here, you can limit run concurrency by supplying a unique tag to your job, and using a QueuedRunCoordinator to limit the maximum number of concurrent runs for that tag.