My goal is to clean the Data in a column in a Pyspark DF. I have written a function for cleaning .
def preprocess(text):
text = text.lower()
text=text.strip()
text=re.compile('<.*?>').sub('', text)
text = re.compile('[%s]' % re.escape(string.punctuation)).sub(' ', text)
text = re.sub('\s+', ' ', text)
text = re.sub(r'\[[0-9]*\]',' ',text)
text=re.sub(r'[^\w\s]', '', text.lower().strip())
text = re.sub(r'\d',' ',text)
text = re.sub(r'\s+',' ',text)
return text
#LEMMATIZATION
# Initialize the lemmatizer
wl = WordNetLemmatizer()
stop_words = set(stopwords.words('english'))
def remove_stopwords(text):
text = [i for i in text.split() if not i in stop_words]
return text
# This is a helper function to map NTLK position tags
def get_wordnet_pos(tag):
if tag.startswith('J'):
return wordnet.ADJ
elif tag.startswith('V'):
return wordnet.VERB
elif tag.startswith('N'):
return wordnet.NOUN
elif tag.startswith('R'):
return wordnet.ADV
else:
return wordnet.NOUN
# Tokenize the sentence
def lemmatizer(string):
word_pos_tags = nltk.pos_tag(word_tokenize(string)) # Get position tags
a=[wl.lemmatize(tag[0], get_wordnet_pos(tag[1])) for idx, tag in enumerate(word_pos_tags)] # Map the position tag and lemmatize the word/token
return " ".join(a)
#Final Function
def finalpreprocess(string):
return lemmatizer(' '.join(remove_stopwords(preprocess(string))))
The functions seems to work fine when I test it . When I do
text = 'Ram and Bheem are buddies. They (both) like <b>running</b>. They got better at it over the weekend'
print(finalpreprocess(text))
I see the exact result I want.
ram bheem buddy like run get well weekend
How ever when I try to apply this function finalpreprocess() to a column in pyspark dataframe . I am getting errors. Here is what I did.
udf_txt_clean = udf(lambda x: finalpreprocess(x),StringType()) df.withColumn("cleaned_text",lem(col("reason"))).select("reason","cleaned_text").show(10,False)
Then I am getting the error :
Traceback (most recent call last):
File "/databricks/spark/python/pyspark/serializers.py", line 473, in dumps
return cloudpickle.dumps(obj, pickle_protocol)
File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 563, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle '_thread.RLock' object
PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object
So far here is what I did. In my finalpreprocess(), I am using three different functions preprocess(),remove_stopwords(), lemmatizer() . I changed my udf_txt_clean accordingly . Like
udf_txt_clean = udf(lambda x: preprocess(x),StringType())
udf_txt_clean = udf(lambda x: remove_stopwords(x),StringType())
These two run fine But -
udf_txt_clean = udf(lambda x: lemmatizer (x),StringType())
is the one that is giving me the error. I am not able to understand why this function is giving the error but not the other two. From my limited understating I see that its having trouble trying to pickle this function but I am not able to understand why its trying to pickle this in the first place or if there is a work around for it.