4

When combining multiple datasets in Python in code repository, I want to put the dataset name in the first column. But I couldn't figure it out by accessing its path

@transform_df(
    Output("/folder/folder1/datasets/mydatset"),
    df1 = Input("A"),
    df2 = Input("B"),
)

def compute(df1, df2, df3):
    print(list(filter(os.path.isfile, os.listdir())))

How can I get my dataset name from within a transform?

3yakuya
  • 2,622
  • 4
  • 25
  • 40

1 Answers1

4

This is not possible using the @transform_df decorator. However it is possible using the more powerful @transform decorator.

API Documentation for @transform

Using @transform will cause your function arguments to become of type TransformInput rather than dataframes directly, which have a property path. Note that you will also need to reference and write to the output dataset manually when using @transform.

For example:

@transform(
    out=Output("/path/to/my/output"),
    inp1=Input("/path/to/my/input1"),
    inp2=Input("/path/to/my/input2"),
)
def compute(out, inp1, inp2):
    # Add columns containing dataset paths.
    df1 = inp1.dataframe().withColumn("dataset_path", F.lit(inp1.path))
    df2 = inp2.dataframe().withColumn("dataset_path", F.lit(inp2.path))

    # For example.
    result = union_many(df1, df2, how="strict")

    # Write output manually
    out.write_dataframe(result)

However note that a dataset's path is an unstable identifier. If someone were to move or rename these inputs, it could cause unintended behaviour in your pipeline.

For this reason, for a production pipeline I would generally recommend using a more stable identifier. Either a manually chosen hard-coded one (in this case you can use @transform_df again):

@transform_df(
    df1=Input("/path/to/my/input1"),
    df2=Input("/path/to/my/input2"),
)
def compute(df1, df2):
    df1 = df1.withColumn("input_dataset", F.lit("input_1"))
    df2 = df2.withColumn("input_dataset", F.lit("input_2"))
    # ...etc

or the dataset's RID, using inp1.rid instead of inp1.path.

Note that if you have a large number of inputs, all of these methods can be made neater using python's varargs syntax and comprehensions:

# Using path or rid
@transform(
    out=Output("/path/to/my/output"),
    inp1=Input("/path/to/my/input1"),
    inp2=Input("/path/to/my/input2"),
    # and many more...
)
def compute(out, **inps):
    # Add columns containing dataset rids (or paths).
    dfs = [
        inp.dataframe().withColumn("dataset_rid", F.lit(inp.rid))
        for key, inp in inps.items()
    ]

    # For example
    result = union_many(*dfs, how="strict")
    out.write_dataframe(result)


# Using manual keys, we can reuse the argument names as keys.
@transform_df(
    Output("/path/to/my/output"),
    df1=Input("/path/to/my/input1"),
    df2=Input("/path/to/my/input2"),
    # and many more...
)
def compute(**dfs):
    # Add columns containing dataset keys.
    dfs = [
        df.withColumn("dataset_key", F.lit(key))
        for key, df in dfs.items()
    ]

    # For example
    return union_many(*dfs, how="strict")
Ryan Norris
  • 151
  • 1
  • 3