As mentioned, you can control this at a flow-level using the --max-workers
flag.
To permanently override the --max-workers
flag for a flow, here is a decorator. This decorator can also be used to override other Metaflow flags as well, such as --max-num-splits
.
def fix_cli_args(**kwargs: Dict[str, str]):
"""
Decorator to override Metaflow CLI arguments.
Usage:
@fix_cli_args(**{"--max-workers": "1", "--max-num-splits": "100"})
class InferencePipeline(FlowSpec): ...
Warnings:
If the argument is specified by the user, it will be overridden by the value specified in the decorator and a
warning will be raised.
"""
def decorator(pipeline):
def wrapper():
if "run" not in sys.argv and "resume" not in sys.argv:
# ignore this decorator if we are not running or resuming a flow
return pipeline()
for arg, val in kwargs.items():
if arg in sys.argv: # if arg was passed, override it
ind = sys.argv.index(arg)
logger.warning(f"`{arg}` arg was passed with value `{sys.argv[ind + 1]}`. However, this value will"
f"be overriden by @fix_cli_args with value {val}")
sys.argv[ind + 1] = val # replace the val
else: # otherwise, add (arg, val) to the call
sys.argv.extend([arg, val])
logger.info(f"Fixed CLI args for {kwargs.keys()}")
return pipeline()
return wrapper
return decorator