This is not possible using the @transform_df
decorator. However it is possible using the more powerful @transform
decorator.
API Documentation for @transform
Using @transform
will cause your function arguments to become of type TransformInput
rather than dataframes directly, which have a property path
. Note that you will also need to reference and write to the output dataset manually when using @transform
.
For example:
@transform(
out=Output("/path/to/my/output"),
inp1=Input("/path/to/my/input1"),
inp2=Input("/path/to/my/input2"),
)
def compute(out, inp1, inp2):
# Add columns containing dataset paths.
df1 = inp1.dataframe().withColumn("dataset_path", F.lit(inp1.path))
df2 = inp2.dataframe().withColumn("dataset_path", F.lit(inp2.path))
# For example.
result = union_many(df1, df2, how="strict")
# Write output manually
out.write_dataframe(result)
However note that a dataset's path is an unstable identifier. If someone were to move or rename these inputs, it could cause unintended behaviour in your pipeline.
For this reason, for a production pipeline I would generally recommend using a more stable identifier. Either a manually chosen hard-coded one (in this case you can use @transform_df
again):
@transform_df(
df1=Input("/path/to/my/input1"),
df2=Input("/path/to/my/input2"),
)
def compute(df1, df2):
df1 = df1.withColumn("input_dataset", F.lit("input_1"))
df2 = df2.withColumn("input_dataset", F.lit("input_2"))
# ...etc
or the dataset's RID, using inp1.rid
instead of inp1.path
.
Note that if you have a large number of inputs, all of these methods can be made neater using python's varargs syntax and comprehensions:
# Using path or rid
@transform(
out=Output("/path/to/my/output"),
inp1=Input("/path/to/my/input1"),
inp2=Input("/path/to/my/input2"),
# and many more...
)
def compute(out, **inps):
# Add columns containing dataset rids (or paths).
dfs = [
inp.dataframe().withColumn("dataset_rid", F.lit(inp.rid))
for key, inp in inps.items()
]
# For example
result = union_many(*dfs, how="strict")
out.write_dataframe(result)
# Using manual keys, we can reuse the argument names as keys.
@transform_df(
Output("/path/to/my/output"),
df1=Input("/path/to/my/input1"),
df2=Input("/path/to/my/input2"),
# and many more...
)
def compute(**dfs):
# Add columns containing dataset keys.
dfs = [
df.withColumn("dataset_key", F.lit(key))
for key, df in dfs.items()
]
# For example
return union_many(*dfs, how="strict")