12

The sample code from Florian

-----------+-----------+-----------+
|ball_column|keep_the   |hall_column|
+-----------+-----------+-----------+
|          0|          7|         14|
|          1|          8|         15|
|          2|          9|         16|
|          3|         10|         17|
|          4|         11|         18|
|          5|         12|         19|
|          6|         13|         20|
+-----------+-----------+-----------+

The first part of the code drops columns name in the banned list

#first part of the code

banned_list = ["ball","fall","hall"]
condition = lambda col: any(word in col for word in banned_list)
new_df = df.drop(*filter(condition, df.columns))

So the above piece of code should drop the ball_column and hall_column.

The second part of the code buckets specific columns in the list. For this example, we will bucket the only one remaining, keep_column.

bagging = 
    Bucketizer(
        splits=[-float("inf"), 10, 100, float("inf")],
        inputCol='keep_the',
        outputCol='keep_the')

Now bagging the columns using pipeline was as follows

model = Pipeline(stages=bagging).fit(df)

bucketedData = model.transform(df)

How can I add the first block of the code (banned list, condition, new_df) to the ml pipeline as a stage?

PolarBear10
  • 2,065
  • 7
  • 24
  • 55

1 Answers1

26

I believe this does what you want. You can create a custom Transformer, and add that to the stages in the Pipeline. Note that I slightly changed your functions because we do not have access to all variables you mentioned, but the concept remains the same.

Hope this helps!

import pyspark.sql.functions as F
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import Bucketizer
from pyspark.sql import DataFrame
from typing import Iterable
import pandas as pd

# CUSTOM TRANSFORMER ----------------------------------------------------------------
class ColumnDropper(Transformer):
    """
    A custom Transformer which drops all columns that have at least one of the
    words from the banned_list in the name.
    """

    def __init__(self, banned_list: Iterable[str]):
        super(ColumnDropper, self).__init__()
        self.banned_list = banned_list

    def _transform(self, df: DataFrame) -> DataFrame:
        df = df.drop(*[x for x in df.columns if any(y in x for y in self.banned_list)])
        return df


# SAMPLE DATA -----------------------------------------------------------------------
df = pd.DataFrame({'ball_column': [0,1,2,3,4,5,6],
                   'keep_the': [6,5,4,3,2,1,0],
                   'hall_column': [2,2,2,2,2,2,2] })
df = spark.createDataFrame(df)


# EXAMPLE 1: USE THE TRANSFORMER WITHOUT PIPELINE -----------------------------------
column_dropper = ColumnDropper(banned_list = ["ball","fall","hall"])
df_example = column_dropper.transform(df)


# EXAMPLE 2: USE THE TRANSFORMER WITH PIPELINE --------------------------------------
column_dropper = ColumnDropper(banned_list = ["ball","fall","hall"])
bagging = Bucketizer(
        splits=[-float("inf"), 3, float("inf")],
        inputCol= 'keep_the',
        outputCol="keep_the_bucket")
model = Pipeline(stages=[column_dropper,bagging]).fit(df)
bucketedData = model.transform(df)
bucketedData.show()

Output:

+--------+---------------+
|keep_the|keep_the_bucket|
+--------+---------------+
|       6|            1.0|
|       5|            1.0|
|       4|            1.0|
|       3|            1.0|
|       2|            0.0|
|       1|            0.0|
|       0|            0.0|
+--------+---------------+

Also, note that if your custom method requires to be fitted (e.g. a custom StringIndexer), you should also create a custom Estimator:

class CustomTransformer(Transformer):

    def _transform(self, df) -> DataFrame:


class CustomEstimator(Estimator):

    def _fit(self, df) -> CustomTransformer:
Yuan Tang
  • 696
  • 4
  • 15
Florian
  • 24,425
  • 4
  • 49
  • 80
  • +1. But what's funny is that your answer contradicts this one here : https://stackoverflow.com/questions/51402369/how-to-bucketize-a-group-of-columns-in-pyspark – eliasah Jul 19 '18 at 08:19
  • 1
    @eliasah Thanks, I did not really look at the Bucketizer, just tried to create a custom Pipeline for the first time. Can you help me understand what the contradiction is though, I fail to see it? – Florian Jul 19 '18 at 08:26
  • 1
    The issue is just the direction that each is advising the OP. But I like this approach it is cleaner. – eliasah Jul 19 '18 at 08:29
  • I know @Matthew . Don't take it wrong ! I just advise you to put more efforts into describing your problem. Your questions looks a bit like XY problem and I strongly advise you to read https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-dataframe-examples to help you writing better question ;) – eliasah Jul 19 '18 at 08:32
  • @Florian I am not sure why this question has been edited to, "how to drop columns in a ML pipeline" ,I am not trying to drop columns in ML pipeline, I am trying to add custom functions to the ML pipeline whether its bucketing, dropping columns, reformatting columns etc... – PolarBear10 Jul 19 '18 at 08:39
  • @Matthew, I fully agree. Someone else changed it, I changed it back to the broader title. – Florian Jul 19 '18 at 08:41
  • @Florian , thank you for the robust answer. I have been sitting and decrypting the structure of your code since you posted the answer, and will continue to do so for the next week. Here is currently all my challenges and feel free to address any or none of them. 1)Why did we pass a `transformer` as an argument in class ColumnDropper ? 2) Why did we use `iterable[str]` in the class constructor of ? 3) What is the use of super().__init__() 4) What is _transform ? 5) Why did we pass `df:DataFrame` as an argument in the `_transform method` 6) What is `-> DataFrame` in the _transform method – PolarBear10 Jul 20 '18 at 12:35
  • 1
    Hi @Mathew, 1) So our class inherits from Transformer, see [here](https://www.python-course.eu/python3_inheritance.php). 2) To indicate that this argument should be an iterable (e.g. list) with strings. 3) to initialize the __init__() function of Transformer. 4) The transform method to transform the input df. 5) the method expects a `DataFrame`, how would it otherwise know which `DataFrame` to transform? 6) it indicates that the output will be of class `DataFrame` (type hinting). Hope this clarifies a bit ;) – Florian Jul 20 '18 at 12:45
  • @Florian Regarding example 2 in your above solution: under the heading, **using transformer with pipeline**. Specifically, the **bucketizer** part. in your code, it takes one specific column and outputs one specific column. How can I modify it to take in multiple columns and output multiple columns. Kindly Check the above-updated part in my question for the code. If we pass it directly to the pipeline, the error message becomes __Cannot recognize a pipeline stage of type __ . What do you advise on this issue? :) – PolarBear10 Jul 20 '18 at 16:34
  • @Matthew It sounds like you are trying to do `stages=[column_dropper,bagging]`, where `bagging` has been created with `bagging = [...]`, so it is already a list. It should be `bagging= ...`. In the future, if you have follow up questions, please consider opening a new question on Stack Overflow instead of editing your current one ;) – Florian Jul 21 '18 at 06:49
  • @Florian sure ! sorry about that. I will revert the question to what it was ! and open a new question. – PolarBear10 Jul 21 '18 at 07:48
  • @Florian thanks for answer. I am trying to save your pipeline for using it in scala. When I load pipelineModel in scala I am getting "Expected class name org.apache.spark.ml.PipelineModel but found class name pyspark.ml.pipeline.PipelineModel". If I don't add custom class like yours as stage to my pipelineModel in pyspark, I can successfully load that pipelinemodel to scala without any complication. Any idea about that issue? – neverwinter Apr 17 '19 at 14:03
  • `CustomTransformer` should extend `pyspark.ml.Model` instead of `Transformer` (and maybe the name can be changed). Otherwise, you might get into serialization issue such as when using MLflow (see https://mlflow.org/docs/latest/python_api/mlflow.spark.html#mlflow.spark.log_model). – minhle_r7 Jul 28 '21 at 17:38