1

I have the following SparkSQL:

val resultDf = spark.sql("SELECT name, phone, country FROM users")

I'd like to filter returned records by countries which are present in the following collection:

val countries = Seq("Italy", "France", "United States", "Poland", "Spain")

For example I can create the broadcast variable based on the collection:

val countriesBroadcast = sc.broadcast(countries)

but is it possible(and if so - how?) to use the countriesBroadcast variable inside of my SQL query?

alexanoid
  • 24,051
  • 54
  • 210
  • 410
  • 1
    Possible duplicate of [How to refer broadcast variable in dataframes](https://stackoverflow.com/questions/41337553/how-to-refer-broadcast-variable-in-dataframes) – Taha Naqvi Nov 09 '18 at 09:55

2 Answers2

2

It is not possible, with exception to UserDefinedFunctions, UserDefinedAggregateFunctions and Aggregators (i.e. non-declarative code).

To use broadcasting with DataFrame / SQL API you should use DataFrames and use broadcast hint - Spark SQL broadcast hash join.

1

In spark data frame API we can broadcast the entire table can be joined with the target table to get the desired output. Here is the example code.

Imports

import org.apache.spark.sql.functions.broadcast

Code

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.read.option("header", true).csv("data/user.txt")
df.createOrReplaceTempView("users")
val countries = Seq("Italy", "France", "United States", "Poland", "Spain")
import spark.implicits._
spark.sparkContext.parallelize(countries, 1).toDF("country").createOrReplaceTempView("countries")
broadcast(spark.table("countries")).join(spark.table("users"), "country").show()

"data/user.txt" file contents

 name,phone,country
 a,123,India
 b,234,Italy
 c,526,France
 d,765,India

Code output:

+-------+----+-----+
|country|name|phone|
+-------+----+-----+
|  Italy|   b|  234|
| France|   c|  526|
+-------+----+-----+

Note: code tested with Spark 2.2 and Scala 2.11