1

I have a dataframe where I need to perform multiple cleansing, rename etc. How can I achieve custom methods in a class/static & chain as like below to handle columns/rowwise dataframe transformations

targetdf = testdf.cols.remove_white_spaces('*').cols.rename('test','test_new')

I tried following the decorator as basic step to achieve above but was not successful as IDE doesn't handle dynamic methods using property decorator Python Decorator/Class method & Pycharm

rajdallas
  • 65
  • 1
  • 4

1 Answers1

1

You're looking to monkey-patch the pyspark.sql.DataFrame class.

I'd raise caution though. While it has its uses, it can confuse your IDE or worse, the people who read your code.

One of the most common Pyspark techniques is to create functions that accept a DataFrame and return a DataFrame. This will allow you to chain such operations, using the transform() method (which exists in Spark’s Scala API, but sadly was not in Pyspark prior to version 3., unless you monkey-patched it in).

If you're new to monkey-patching and not on Pyspark3.x yet (in preview since Nov 6th), you could also do it like this:


df = spark.createDataFrame([
    ("A", 0),
    ("B", 1),
    ("C", 2)],
    schema=("K E   Y", "cols with   sp  aces"))


def remove_whitespace(s):
    return "".join(_ for _ in s if not _.isspace())


def remove_whitespace_in_colnames(frame):
    old_names_to_new = {c: remove_whitespace(c) for c in frame.columns}
    return reduce(lambda df, old_new: (
                      df.withColumnRenamed(old_new[0], old_new[1])),
                  old_names_to_new.items(),
                  frame
                  )


def other_func(df):
    return df


def other_function(df):
    return df


functions_to_apply = (remove_whitespace_in_colnames, other_function, other_func)
reduce(lambda df, func: func(df),
       functions_to_apply,
       df)
Oliver W.
  • 13,169
  • 3
  • 37
  • 50