0

I need to allow users to define different named collections which they can use during Spark DataFrame SQL construction latter.

I planned to use Spark broadcast variables for this purpose but based on the following SO question How to refer broadcast variable in Spark DataFrameSQL looks like it is impossible

Let's say as a user I have created the following collection through the application UI:

name: countries_dict
values: Seq("Italy", "France", "United States", "Poland", "Spain")

In another application UI(let's day different page) as the user I have created the following Spark SQL query:

SELECT name, phone, country FROM users

and I'd like to filter the records by SELECT name, phone, country FROM users WHERE countries in countries_dict

So, for example, right now I can create something similar in the following way:

val countriesDict = Seq("Italy", "France", "United States", "Poland", "Spain")

val inDict = (s: String) => {
  countriesDict.contains(s)
}

spark.udf.register("in_dict", inDict)

and then:

SELECT name, phone, country FROM users WHERE in_dict(country)

but the biggest issue with this approach, that the countriesDict is hardcoded in the code and not created dynamically based on the user input on UI.

Is it possible to extend this approach somehow to support dynamically created collections(by users) with names and elements via application UI?

alexanoid
  • 24,051
  • 54
  • 210
  • 410

2 Answers2

1

Of course I don't know about your application's UI etc., but is there anything that speaks against turning the collections into dataframes? Of course you could not the WHERE countries in countries_dict syntax but you would have to use a join. But Spark will automatically execute joins as broadcast when the joined dataframe is below a certain treshold, e.g. as explained in Mastering Apache Spark

You would just need some storage where the users can store the contents of these small dataframes, e.g. as CSV files.

Bernhard Stadler
  • 1,725
  • 14
  • 24
1

It doesn't really make sense to use broadcast variables here. Even ignoring structure issues, the cost of invoking udf will likely exceed benefits of broadcasting (especially with such a small structure).

Either inline the query, if data is small (use your favorite SQL processing library to avoid risk of SQL injection):

SELECT name, phone, country FROM users 
WHERE country IN ('Italy', 'France', 'United States', 'Poland', 'Spain')

or just convert input to a DataFrame:

countriesDict.toDF("country").createOrReplaceTempView("countries")

and use ANTI JOIN, either depending on broadcast threshold to automatically promote this to broadcast join, if data is small enough

SELECT * 
FROM users LEFT ANTI JOIN countries 
ON users.country = countries.country

or with explicit broadcast hint

SELECT  /*+ MAPJOIN(countries) */  * 
FROM users LEFT ANTI JOIN countries 
ON users.country = countries.country

Finally you could skip the SQL part and use a DataFrame API either with isin:

spark.table("users").where($"country" isin (countriesDict: _*))

or if you really have logic that requires an UDF:

import org.apache.spark.sql.functions.typedLit

val f = udf((x: String, xs: Seq[String]) => { xs.contains(x) })

spark.table("users").where(f($"country", typedLit(countriesDict)))
10465355
  • 4,481
  • 2
  • 20
  • 44