I have the following piece of small code.
# do all the required imports
import pyspark
import pandas as pd
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
#create a session
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
#fashion_df is described below
fashion_df = pd.read_csv('fashion_df.csv')
#create a UDF
def check_merchant_cat(text):
if not isinstance(text, str):
category = "N/A"
return category
category = fashion_df[fashion_df['merchant_category']==text]['merchant_category']
return category
merchant_category_mapping_func = udf(check_merchant_cat)
df = spark.read.csv('datafeed.csv', header=True, inferSchema=True)
processed_df = df.withColumn("merchant_category_mapped", merchant_category_mapping_func(df['merchant_category']))
processed_df.select('merchant_category_mapped', 'merchant_category').show(10)
Let me describe the problem I am trying to solve.
I have a fashion_df which is basically multiple rows ( around 1000) with the header like this:
merchant_category,category,sub_category
Dresses & Skirts,Dress,Skirts
I have the datafeed.csv also referred in the code above which has around 1 Million rows. Each row has multiple columns, but few of the columns are of interest.
Basically, I want to go over every row of the datafeed.csv. Then, I want to look at the merchant_category column of the row. Then, I want to search for this merchant_category value in the "merchant_category" column of the fashion_df pandas dataframe. Given that it has found the matching row, I take the value in the category column of the corresponding matching row in the fashion_df and return it.
The returned category value is appended as a column in the original datafeed loaded in PySpark.
Is this the right way to do this?