1

I'm creating a pyspark udf inside a class based view and I have the function what I want to call, inside another class based view, both of them are in the same file (api.py), but when I inspect the content of the dataframe resulting, I get this error:

ModuleNotFoundError: No module named 'api'

I can't understand why this happens, I tried to do a similar code in the pyspark console and it worked good. A similar question was asked here but the difference is that I'm trying to do that in the same file.

This a piece of my full code: api.py

class TextMiningMethods():
    def clean_tweet(self,tweet):
        '''
        some logic here
        '''
        return "Hello: "+tweet


class BigDataViewSet(TextMiningMethods,viewsets.ViewSet):

    @action(methods=['post'], detail=False)
    def word_cloud(self, request, *args, **kwargs): 
        '''
        some previous logic here
        '''
        spark=SparkSession \
            .builder \
            .master("spark://"+SPARK_WORKERS) \
            .appName('word_cloud') \
            .config("spark.executor.memory", '2g') \
            .config('spark.executor.cores', '2') \
            .config('spark.cores.max', '2') \
            .config("spark.driver.memory",'2g') \
            .getOrCreate()

        sc.sparkContext.addPyFile('path/to/udfFile.py')
        cols = ['text']
        rows = []

        for tweet_account_index, tweet_account_data in enumerate(tweets_list):

            tweet_data_aux_pandas_df = pd.Series(tweet_account_data['tweet']).dropna()
            for tweet_index,tweet in enumerate(tweet_data_aux_pandas_df):
                row= [tweet['text']]
                rows.append(row)

        # Create a Pandas Dataframe of tweets
        tweet_pandas_df = pd.DataFrame(rows, columns = cols)

        schema = StructType([
            StructField("text", StringType(),True)
        ])

        # Converts to Spark DataFrame
        df = spark.createDataFrame(tweet_pandas_df,schema=schema)
        clean_tweet_udf = udf(TextMiningMethods().clean_tweet, StringType())
        clean_tweet_df = df.withColumn("clean_tweet", clean_tweet_udf(df["text"]))
        clean_tweet_df.show()   # This line produces the error

This similar test in pyspark works good

from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import udf
def clean_tweet(name):
    return "This is " + name

schema = StructType([StructField("Id", IntegerType(),True),StructField("tweet", StringType(),True)])

data = [[ 1, "tweet 1"],[2,"tweet 2"],[3,"tweet 3"]]
df = spark.createDataFrame(data,schema=schema)

clean_tweet_udf = udf(clean_tweet,StringType())
clean_tweet_df = df.withColumn("clean_tweet", clean_tweet_udf(df["tweet"]))
clean_tweet_df.show()

So these are my questions:

  1. What is this error related to? and How can I fix it?
  2. What is the right way to create a pyspark udf when you're working with class based view? is a wrong practice to write functions that you will use as pyspark udf, in the same file where you will call them? (in my case, all my api endpoints, working with django rest framework)

Any help will be appreciated, thanks in advance

UPDATE:

This link and this link explains how to use custom classes with pyspark using SparkContext, but not with SparkSession that is my case , but I used this:

sc.sparkContext.addPyFile('path/to/udfFile.py')

The problem is that I defined the class where I have the functions to use as pyspark udf, in the same file where I'm creating the udf function for the dataframe (as a showed in my code). I couldn't found how to reach that behaviour when the path of addPyFile() is in the same code. In spite of that, I moved my code and I followed these steps (that was another error that I fixed):

  • Create a new folder called udf
  • Create a new empty __ini__.py file, to make the directory to a package.
  • And create a file.py for my udf functions.
core/
    udf/
    ├── __init__.py
    ├── __pycache__
    └── pyspark_udf.py
    api/
    ├── admin.py
    ├── api.py
    ├── apps.py
    ├── __init__.py

In this file, I tried to import the dependencies either at the beginning or inside the function. In all the cases I receive ModuleNotFoundError: No module named 'udf'

pyspark_udf.py

import re
import string
import unidecode
from nltk.corpus import stopwords

class TextMiningMethods():
    """docstring for TextMiningMethods"""
    def clean_tweet(self,tweet):
        # some logic here

I have tried with all of these, At the beginning of my api.py file

from udf.pyspark_udf import TextMiningMethods

# or

from udf.pyspark_udf import *

And inside the word_cloud function

class BigDataViewSet(viewsets.ViewSet):
    def word_cloud(self, request, *args, **kwargs):
        from udf.pyspark_udf import TextMiningMethods

In the python debugger this line works:

from udf.pyspark_udf import TextMiningMethods

But when I show the dataframe, i receive the error:

clean_tweet_df.show()

ModuleNotFoundError: No module named 'udf'

Obviously, the original problem changed to another, now my problem is more related with this question, but I couldn't find a satisfactory way to import the file yet and create a pyspark udf callinf a class function from another class function.

What I'm missing?

Manuel Carrero
  • 599
  • 1
  • 9
  • 29
  • What is a 'class based view'? – Vitaliy Mar 22 '20 at 20:47
  • Does this answer your question? [How to use custom classes with Apache Spark (pyspark)?](https://stackoverflow.com/questions/31093179/how-to-use-custom-classes-with-apache-spark-pyspark) – user10938362 Mar 22 '20 at 21:13
  • @Vitaliy the link below explain what are class based views docs.djangoproject.com/en/3.0/topics/class-based-views in django – Manuel Carrero Apr 01 '20 at 15:44
  • @user10938362 I updated my answer with all the things that I have tried, starting from the link that you provided me, that is similar but not the same case – Manuel Carrero Apr 01 '20 at 15:44
  • I would try a different approach: can you extract the logic from the CBV and make it django free? Your code samples mention text mining, so I guess there is core functionality tjat is not related to hosting (to the fact it is served by a web service). I would do it even as an experiment for differential diagnosis. – Vitaliy Apr 01 '20 at 18:43

2 Answers2

0

After different tries, I couldn't find a solution by referencing to a method in the path of addPyFile(), located in the same file where I was creating the udf (I would like to know if this is a bad practice) or in another file, technically addPyFile(path) documentation says:

Add a .py or .zip dependency for all tasks to be executed on this SparkContext in the future. The path passed can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), or an HTTP, HTTPS or FTP URI.

So what I mention should be possible. Based on that, I had to used this solution and zip all the udf folder from it's highest level with:

zip -r udf.zip udf

Also, in the pyspark_udf.py I had to import my dependencies as below to avoid this problem

class TextMiningMethods():
    """docstring for TextMiningMethods"""
    def clean_tweet(self,tweet):
        import re
        import string
        import unidecode
        from nltk.corpus import stopwords

Instead of:

import re
import string
import unidecode
from nltk.corpus import stopwords

class TextMiningMethods():
    """docstring for TextMiningMethods"""
    def clean_tweet(self,tweet):

Then, finally this line worked good:

clean_tweet_df.show()

I hope this could be useful for anyone else

Manuel Carrero
  • 599
  • 1
  • 9
  • 29
0

Thank you! Your approach worked for me.

Just to clarify my steps:

  • Made a udf module with __init__.py and pyspark_udfs.py
  • Made a bash file to zip udfs first and then run my files on the top level:

runner.sh

echo "zipping udfs..."
zip -r udf.zip udf
echo "udfs zipped"

echo "running script..."
/opt/conda/bin/python runner.py
echo "script ended."
  • In actual code imported my udfs from udf.pyspark_udfs module and initialized my udfs in the python function I need, like so:

    def _produce_period_statistics(self, df: pyspark.sql.DataFrame, period: str) -> pyspark.sql.DataFrame:

        """ Produces basic and trend statistics based on user visits."""

        # udfs
        get_hist_vals_udf = F.udf(lambda array, bins, _range: get_histogram_values(array, bins, _range), ArrayType(IntegerType()))
        get_hist_edges_udf = F.udf(lambda array, bins, _range: get_histogram_edges(array, bins, _range), ArrayType(FloatType()))
        get_mean_udf = F.udf(get_mean, FloatType())
        get_std_udf = F.udf(get_std, FloatType())
        get_lr_coefs_udf = F.udf(lambda bar_height, bar_edges, hist_upper: get_linear_regression_coeffs(bar_height, bar_edges, hist_upper), StringType())
   ...