2

I need to use an UDF in Spark that takes in a timestamp, an Integer and another dataframe and returns a tuple of 3 values.

I keep hitting error after error and I'm not sure I'm trying to fix it right anymore.

Here is the function:

def determine_price (view_date: org.apache.spark.sql.types.TimestampType , product_id: Int, price_df: org.apache.spark.sql.DataFrame) : (Double, java.sql.Timestamp, Double) = {
    var price_df_filtered = price_df.filter($"mkt_product_id" === product_id && $"created"<= view_date)
    var price_df_joined = price_df_filtered.groupBy("mkt_product_id").agg("view_price" -> "min", "created" -> "max").withColumn("last_view_price_change", lit(1))
    var price_df_final = price_df_joined.join(price_df_filtered, price_df_joined("max(created)") === price_df_filtered("created")).filter($"last_view_price_change" === 1)
    var result = (price_df_final.select("view_price").head().getDouble(0), price_df_final.select("created").head().getTimestamp(0), price_df_final.select("min(view_price)").head().getDouble(0))
    return result
}
val det_price_udf = udf(determine_price)

the error it gives me is:

error: missing argument list for method determine_price
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `determine_price _` or `determine_price(_,_,_)` instead of `determine_price`.

If I start adding the arguments I keep running in other errors such as Int expected Int.type found or object DataFrame is not a member of package org.apache.spark.sql

To give some context:

The idea is that I have a dataframe of prices, a product id and a date of creation and another dataframe containing product IDs and view dates.

I need to determine the price based on which was the last created price entry that is older than the view date.

Since each product ID has multiple view dates in the second dataframe. I thought an UDF is faster than a cross join. If anyone has a different idea, I'd be grateful.

UrVal
  • 351
  • 6
  • 17

1 Answers1

1

You cannot pass the Dataframe inside UDF as UDF will be running on the Worker On a particular partition. And as you cannot use RDD on Worker( Is it possible to create nested RDDs in Apache Spark? ), similarly you cannot use the DataFrame on Worker too.!

You need to do a work around for this !

Community
  • 1
  • 1
Shivansh
  • 3,454
  • 23
  • 46
  • Ok, I removed the dataframe from de UDF arguments. The dataframe is cached and broadcasted and it should be accessible from within the function I still get the error: `error: type mismatch; found : Int.type required: Int val det_price_udf = udf(determine_price(org.apache.spark.sql.types.TimestampType, Int))` – UrVal Dec 19 '16 at 08:52
  • It seems that if the dataframe is not in the UDF, it cannot be used. It's not a "global variable" like I'm used to in Python. Not sure how to work around this. – UrVal Dec 19 '16 at 13:03
  • What is your use case ? – Shivansh Dec 19 '16 at 17:12
  • I have a dataframe which contains a lot of page views of products (product_id, view date) and another dataframe which tracks the changes of the price of a product (product_id, change_date, price). For each view in the first dataframe, I need to determine which price change happened right before the page was viewed. so if price of product X changed on 18th to 500 and on 20th to 600, I need to determine that for the view recorded on 19th, the price was 500 and for the view on 21st the price was 600. – UrVal Dec 19 '16 at 17:38
  • I think it would be better if you can ask this as a separate questions So that many more can help you out with this use case ! Till then I am thinking about this question! And as it solves this problem so accept this answer! – Shivansh Dec 19 '16 at 17:55