0

I'm writing my own transformers for my Pyspark project, and I'm stumbling on a problem:

If I write the transformers right within the module / notebook where I'll use it, everything works fine; for example:

from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import (HasInputCol, HasInputCols, HasOutputCol, 
    HasOutputCols, Param)
from pyspark.sql import (SparkSession, types, functions as funcs)

spark = SparkSession.builder.appName('my_session').getOrCreate()

# My Custom Transformer 1:
class MyTransformerOne(Transformer, HasInputCol, HasOutputCol):
    @keyword_only
    def __init__(self, inputCol='my_input', outputCol='my_output'):
        super(MyTransformerOne, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol='my_input', outputCol='my_output'):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        # I need a little dataframe here to perform some tasks:
        df = spark.createDataFrame(
            [
                {'col1': 1, 'col2': 'A'}, {'col1': 2, 'col2': 'B'}
            ],
            schema = types.StructType([
                types.StructField('col1', types.IntegerType(), True),
                types.StructField('col2', types.StringType(), True),
            ])
        )
        pass # Lots of other things happen here... the little dataframe above
             # is joined with the 'to be transformed' dataset and some columns
             # are calculated.
        return final_dataset

df = MyTransformerOne().fit(input_df).transform(input_df)
# This works Ok

I have like 7 of this transformers, so I'd like to store them in a separate module (let's call it my_transformers.py, and I thought: "well, I'll need a SparkSession object to make this work... so let's put it on the __init__ method". However it doesn't work:

"""
my_transformers.py
"""

from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import (HasInputCol, HasInputCols, HasOutputCol, 
    HasOutputCols, Param)
from pyspark.sql import (types, functions as funcs)

class MyTransformerOne(Transformer, HasInputCol, HasOutputCol):
    @keyword_only
    def __init__(self, spark=None, inputCol='my_input', output_col='my_output'):
        super(MyTransformerOne, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol='my_input', outputCol='my_output'):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        # Let's use the instance attribute to create the dataframe
        df = self.spark.createDataframe(...)
        # ... same as above

Then, on my other module / notebook:

import my_transformers

# ... Create a spark session, load the data, etcetera
df = my_transformers.MyTransformerOne().fit(input_df).transform(input_df)

This fails:

AttributeError: 'MyTransformerOne' object has no attribute 'spark'

I'm lost here. So, my question(s) is (are):

  1. Can I pass a SparkSession object to a custom transformer object?
  2. How to make this work? I really need to create those dataframes inside the transformer classes (there's no point on creating them outside of the transformers, since they will not be used on any other task).

Can you point me in the right direction?

Barranka
  • 20,547
  • 13
  • 65
  • 83

1 Answers1

0

It turned out to be easier than I thought!

I found this answer: I can call SparkSession.builder.getOrCreate() inside my classes. Once my_transformers module is imported, every time I need to use the Spark Session, I only need to add that line to my methods.

So, the full code is something like this:

from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import (HasInputCol, HasInputCols, HasOutputCol, 
    HasOutputCols, Param)
from pyspark.sql import (SparkSession, types, functions as funcs)

# My Custom Transformer 1:
class MyTransformerOne(Transformer, HasInputCol, HasOutputCol):
    @keyword_only
    def __init__(self, inputCol='my_input', outputCol='my_output'):
        super(MyTransformerOne, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol='my_input', outputCol='my_output'):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        # HERE! I get the active SparkSession.
        spark = SparkSession.builder.getOrCreate()

        # I need a little dataframe here to perform some tasks:
        df = spark.createDataFrame(
            [
                {'col1': 1, 'col2': 'A'}, {'col1': 2, 'col2': 'B'}
            ],
            schema = types.StructType([
                types.StructField('col1', types.IntegerType(), True),
                types.StructField('col2', types.StringType(), True),
            ])
        )
        pass # Lots of other things happen here... the little dataframe above
             # is joined with the 'to be transformed' dataset and some columns
             # are calculated.
        return final_dataset

df = MyTransformerOne().fit(input_df).transform(input_df)

I'll leave this post here, and I'll mark my question as duplicate.

Barranka
  • 20,547
  • 13
  • 65
  • 83