1

I have two dataframes as follows:

df1 (reference data)

Tempe, AZ, USA
San Jose, CA, USA
Mountain View, CA, USA
New York, NY, USA

df2 (User entered data)

Tempe, AZ
Tempe, Arizona
San Jose, USA
San Jose, CA
Mountain View, CA

I would like to get a dataframe (df3) as following:

-------------------------------------------
|Tempe, AZ, USA        | Tempe, Arizona   |
|Tempe, AZ, USA        | Tempe, AZ        |
|San Jose, CA, USA     | San Jose, CA     |
|San Jose, CA, USA     | San Jose, USA    |
|Mountain View, CA, USA| Mountain View, CA|
-------------------------------------------

I already a User Defined Function :

isSameAs(str1: String, str2:String): Boolean{
    ......
} 

that take two strings (user entered data and reference data) and tells me if they are a match or not.

I just need to find out the right way to implement map in Scala Spark SQL so that I get the the dataframe like df3.

Saqib Ali
  • 3,953
  • 10
  • 55
  • 100

2 Answers2

2

Option 1: You can use an UDF as join expression:

import org.apache.spark.sql.functions._
val isSameAsUdf = udf(isSameAs(_,_))
val result = df1.join(df2, isSameAsUdf(df1.col("address"), df2.col("address")))

The downside of this approach is that Spark performs a cartesian product over both dataframes df1 and df2 and then filters the columns that do not match the join condition afterwards (more details here). Running result.explain prints

== Physical Plan ==
CartesianProduct UDF(address#4, address#10)
:- LocalTableScan [address#4]
+- LocalTableScan [address#10]

Option 2: to avoid the cartesian product, it might be faster to broadcast the reference data as a standard Scala sequence and then do the mapping of the addresses in another UDF:

val normalizedAddress: Seq[String] = //content of df2 as scala sequence
val broadcastSeq = spark.sparkContext.broadcast(normalizedAddress)

def toNormalizedAddress(str: String ): String = 
    broadcastSeq.value.find(isSameAs(_, str)).getOrElse("")
val toNormalizedAddressUdf = udf(toNormalizedAddress(_))

val result2 = df2.withColumn("NormalizedAddress", toNormalizedAddressUdf('address))

The result is the same as for option 1, but result2.explain prints

== Physical Plan ==
LocalTableScan [address#10, NormalizedAddress#40]

This second option works, if the amount of reference data is small enough to be broadcasted. Depending on the cluster's hardware, some 10.000s lines of reference data would still considered to be small.

werner
  • 13,518
  • 6
  • 30
  • 45
  • originally i had a cartesian product, which was working well. But as the data increased it is not scaling on the machines we have. For Option 2: is there way to NOT a row if there is no normalizedAddress for a given user input i.e. we want to ignore user input address that can not be normalized. – Saqib Ali Jun 27 '20 at 17:44
  • in the definition of `toNormalizedAddress` a default value is set when no value in the reference data is found by calling `getOrElse`. In my code, this value is set to an empty string. But it would be possible to set it to any other value (maybe `NOT_FOUND`) and then filter out these rows afterwards. – werner Jun 27 '20 at 18:31
  • if I filter after wouldn't I still be dealing with very large data set before the filter? – Saqib Ali Jun 28 '20 at 04:34
  • both dataframes `df2` and `result` have the same number of rows. The execution of `withColumn` only adds an additional field to each row. The number of rows stays unchanged. – werner Jun 28 '20 at 11:01
1

Assuming the below schema (address:string), try this-

Load the data

  val data1 =
      """Tempe, AZ, USA
        |San Jose, CA, USA
        |Mountain View, CA, USA""".stripMargin
    val df1 = data1.split(System.lineSeparator()).toSeq.toDF("address")
    df1.show(false)
    /**
      * +----------------------+
      * |address               |
      * +----------------------+
      * |Tempe, AZ, USA        |
      * |San Jose, CA, USA     |
      * |Mountain View, CA, USA|
      * +----------------------+
      */

    val data2 =
      """Tempe, AZ
        |Tempe, Arizona
        |San Jose, USA
        |San Jose, CA
        |Mountain View, CA""".stripMargin

    val df2 = data2.split(System.lineSeparator()).toSeq.toDF("address")
    df2.show(false)

    /**
      * +-----------------+
      * |address          |
      * +-----------------+
      * |Tempe, AZ        |
      * |Tempe, Arizona   |
      * |San Jose, USA    |
      * |San Jose, CA     |
      * |Mountain View, CA|
      * +-----------------+
      */

Extract the joining key and join based on that


    df1.withColumn("joiningKey", substring_index($"address", ",", 1))
      .join(
        df2.withColumn("joiningKey", substring_index($"address", ",", 1)),
        "joiningKey"
      )
      .select(df1("address"), df2("address"))
      .show(false)

    /**
      * +----------------------+-----------------+
      * |address               |address          |
      * +----------------------+-----------------+
      * |Tempe, AZ, USA        |Tempe, AZ        |
      * |Tempe, AZ, USA        |Tempe, Arizona   |
      * |San Jose, CA, USA     |San Jose, USA    |
      * |San Jose, CA, USA     |San Jose, CA     |
      * |Mountain View, CA, USA|Mountain View, CA|
      * +----------------------+-----------------+
      */
Som
  • 6,193
  • 1
  • 11
  • 22
  • hi. i am not following your answer. where do I use my isSameAs function in the JOIN? – Saqib Ali Jun 27 '20 at 07:31
  • I have added one more column `joiningKey` derived from the original address column from the dataframe. Spark will internally join the `joiningKey` column from Df1 with the same column from Df2. I don't think you need a method `is SameAs` here since spark will do that internally for you – Som Jun 27 '20 at 08:54
  • @SomeshwarKale actually you are only comparing the parts before the first `,` of the address strings, correct? – werner Jun 27 '20 at 13:17
  • @werner, yes. But this is what expected to get the output, no? – Som Jun 27 '20 at 14:06
  • @SomeshwarKale the IsSameAs is a complex logic, that takes into account more than just the location of the ",". So I need to use that. – Saqib Ali Jun 27 '20 at 17:28