2

I have the following piece of small code.

# do all the required imports
import pyspark
import pandas as pd
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession

#create a session
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

#fashion_df is described below
fashion_df = pd.read_csv('fashion_df.csv')

#create a UDF
def check_merchant_cat(text):
    if not isinstance(text, str):
        category = "N/A"
        return category

    category = fashion_df[fashion_df['merchant_category']==text]['merchant_category']

    return category

merchant_category_mapping_func = udf(check_merchant_cat)

df = spark.read.csv('datafeed.csv', header=True, inferSchema=True)

processed_df = df.withColumn("merchant_category_mapped", merchant_category_mapping_func(df['merchant_category']))

processed_df.select('merchant_category_mapped', 'merchant_category').show(10)

Let me describe the problem I am trying to solve.

I have a fashion_df which is basically multiple rows ( around 1000) with the header like this:

merchant_category,category,sub_category
Dresses & Skirts,Dress,Skirts

I have the datafeed.csv also referred in the code above which has around 1 Million rows. Each row has multiple columns, but few of the columns are of interest.

Basically, I want to go over every row of the datafeed.csv. Then, I want to look at the merchant_category column of the row. Then, I want to search for this merchant_category value in the "merchant_category" column of the fashion_df pandas dataframe. Given that it has found the matching row, I take the value in the category column of the corresponding matching row in the fashion_df and return it.

The returned category value is appended as a column in the original datafeed loaded in PySpark.

Is this the right way to do this?

desertnaut
  • 57,590
  • 26
  • 140
  • 166
London guy
  • 27,522
  • 44
  • 121
  • 179
  • 2
    This is almost certainly not the right way to do this. A better approach is to convert your pandas DataFrame into a spark DataFrame and do a join. The `udf` here will be very slow and expensive. You can probably do everything using only API functions (runs in the JVM). – pault Aug 22 '18 at 15:21
  • @pault, can you please clarify what you mean by doing everything using API functions? So you mean no pyspark required? – London guy Aug 22 '18 at 15:49
  • Also @pault, please be informed that what I wrote is a basic version of what I want to exactly do. The category column value is not just based on join of merchant_category column but also in addition I need to comopare the title column against a regex. Only if both the conditions match I need to return the category value. – London guy Aug 22 '18 at 15:50
  • @pault, I am actually ok if this whole process takes few mins so long as it is not like 50-60 mins for a million rows. – London guy Aug 22 '18 at 15:51
  • Can you [edit] your question to include a [reproducible sample of your inputs and the desired output](https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-dataframe-examples)? Regarding API functions vs. `udf`, I mean the Spark API functions. Read [this post](https://stackoverflow.com/questions/38296609/spark-functions-vs-udf-performance) for a detailed explanation. – pault Aug 22 '18 at 15:59
  • 1
    Also, your title says *"Why does it throw this weird error?"* but you didn't include any mention of that in your post. Can you also include the full traceback? – pault Aug 22 '18 at 16:06
  • 1
    Sure @pault, let me edit the question tonight and add more details. Would you be happy to provide an answer tomorrow? Highly appreciated. – London guy Aug 22 '18 at 16:10
  • 1
    I work in AI and I am learning PySpark now, Your guidance on this specific problem would be of enormous help. Thanks a lot for your time. – London guy Aug 22 '18 at 16:12
  • Including (multiple) rows filled only with `#` only clutters your code while being completely unnecessary here; kindly avoid it in the future (removed) . – desertnaut Aug 22 '18 at 23:42

1 Answers1

1

Step zero: import functions:

from pyspark.sql.functions import *

First step: create Spark's DataFrame:

#Instead of: fashion_df = pd.read_csv('fashion_df.csv')
fashion_df = spark.read.csv('fashion_df.csv', header=True, inferSchema=True).withColumnRenamed("merchant_category", "mc")

Column Rename is just to make it simpler later.

Second step: join with this DataFrame. Important: do a left join, so later you can map nulls to "N/A" category:

df_with_fashion = df.join(fashion_df, df.merchant_category = fashion_df.mc, 'left')

Third step: create a new column and map nulls to "N/A".

processed_df = df_with_fashion.withColumn("merchant_category_mapped", coalesce(col("merchant_category"), lit("N/A"))
T. Gawęda
  • 15,706
  • 4
  • 46
  • 61
  • Note: I see "pault" wrote some comments, here I also added code and some explanation. If he will post an answer, I will also up vote it ;) – T. Gawęda Aug 22 '18 at 18:59