6

I am using Spark 1.6 and I would like to know how to implement in lookup in the dataframes.

I have two dataframes employee & department.

  • Employee Dataframe

    -------------------
    Emp Id | Emp Name
    ------------------
    1 | john
    2 | David
    
  • Department Dataframe

    --------------------
    Dept Id | Dept Name | Emp Id
    -----------------------------
    1 | Admin | 1
    2 | HR | 2
    

I would like to lookup emp id from the employee table to the department table and get the dept name. So, the resultset would be

Emp Id | Dept Name
-------------------
1 | Admin
2 | HR

How do I implement this look up UDF feature in SPARK. I don't want to use JOIN on both the dataframes.

zero323
  • 322,348
  • 103
  • 959
  • 935
Prasan
  • 93
  • 1
  • 2
  • 6
  • what you need is "joining" the two dataframes... if one is very small, use a broadcast-join. – Raphael Roth Dec 22 '16 at 05:57
  • Do you have any code examples what you have so far? – Fokko Driesprong Dec 22 '16 at 07:55
  • This is what a `join` is mean to do. What are the reasons to try to implement this by some other means? – maasg Dec 22 '16 at 10:39
  • I have already implemented using join. But I want to explore using the look up concept as well (jus to learn and see the difference in implementation and performance). Does any one know? – Prasan Dec 22 '16 at 13:39
  • You could convert your dataFrame to a `PairRDD` (e.g.. a `RDD[(Int,String)])` whichs provides a `lookup` method – Raphael Roth Dec 22 '16 at 16:35

3 Answers3

10

As already mentioned in the comments, joining the dataframes is the way to go.

You can use a lookup, but I think there is no "distributed" solution, i.e. you have to collect the lookup-table into driver memory. Also note that this approach assumes that EmpID is unique:

import org.apache.spark.sql.functions._
import sqlContext.implicits._
import scala.collection.Map

val emp = Seq((1,"John"),(2,"David"))
val deps = Seq((1,"Admin",1),(2,"HR",2))

val empRdd = sc.parallelize(emp)
val depsDF = sc.parallelize(deps).toDF("DepID","Name","EmpID")


val lookupMap = empRdd.collectAsMap()
def lookup(lookupMap:Map[Int,String]) = udf((empID:Int) => lookupMap.get(empID))

val combinedDF = depsDF
  .withColumn("empNames",lookup(lookupMap)($"EmpID"))

My initial thought was to pass the empRdd to the UDF and use the lookup method defined on PairRDD, but this does of course not work because you cannot have spark actions (i.e. lookup) within transformations (ie. the UDF).

EDIT:

If your empDf has multiple columns (e.g. Name,Age), you can use this

val empRdd = empDf.rdd.map{row =>
      (row.getInt(0),(row.getString(1),row.getInt(2)))}


    val lookupMap = empRdd.collectAsMap()
    def lookup(lookupMap:Map[Int,(String,Int)]) =
         udf((empID:Int) => lookupMap.lift(empID))

    depsDF
      .withColumn("lookup",lookup(lookupMap)($"EmpID"))
      .withColumn("empName",$"lookup._1")
      .withColumn("empAge",$"lookup._2")
      .drop($"lookup")
      .show()
Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
  • Thanks. In your example, empRdd is not dataframe. How will I convert my dataframe to rdd to use collectAsMap function? – Prasan Dec 22 '16 at 18:12
  • @Prasan there is an `rdd` method on the dataframe, so you have to so something like `val empRdd = empDf.rdd.map(row => (row.getInt(0),row.getString(1)))` – Raphael Roth Dec 22 '16 at 19:26
  • Yeah but collectAsMap is not a member of RDD. When I tried using collect instead of collectAsMap, it gives me an array which I can't use in the lookup function. – Prasan Dec 22 '16 at 19:37
  • 1
    @Prasan if your RDD is of type `RDD[(Int,String)]`, then you should be able to use `collectAsMap` through implicit conversions to `PairRDDFunctions` – Raphael Roth Dec 22 '16 at 20:15
  • Is there a way to return more than one column instead of only one? Lets assume you have emp_age as well in your emp dataframe example, Is it possible to display that as well using this look up concept? – Prasan Dec 22 '16 at 20:43
  • Thanks, it works :) But it crashes when the KEY not found with NoSuchElementException error :( – Prasan Dec 22 '16 at 20:59
  • @Prasan See my updated answer regarding th Exception (use `lift` on the map). And yes, you could also have multiple "columns", just use a tuple as "value" of your RDD / map instead of a String – Raphael Roth Dec 23 '16 at 06:04
  • Thanks "lift" works fine. I tried to return multiple values using list but not sure how do I access it to return.
    val empRdd = empDf.rdd.map(row => (row.getInt(0),List(row.getString(1),row.getString(2))))
    val lookupMap = empRdd.collectAsMap()
    def lookup(lookupMap:Map[Int,Any]) = udf((empID:Int) => lookupMap.lift(empID))
    I am not sure how do I access above list in the below .WithColumn statement
    .withColumn("empNames",lookup(lookupMap)($"EmpID"))
    – Prasan Dec 23 '16 at 15:29
  • @Prasan Use a tuple instead of a List. See my updated answer. – Raphael Roth Dec 23 '16 at 15:48
  • just wondering what if there are multiple empIds in the look up...is there a way to return all the matched records? – Prasan Dec 23 '16 at 22:50
  • @Raphael Roth thanks , is there anyway to do the same without "rdds" – BdEngineer Jul 20 '20 at 05:46
  • @Raphael Roth any clue on how to handle this in map in UDF https://stackoverflow.com/questions/63935600/writing-udf-for-looks-up-in-the-map-in-java-giving-unsupported-literal-type-clas – BdEngineer Sep 17 '20 at 10:37
  • Can anyone convert this line to pyspark ................................................. def lookup(lookupMap:Map[Int,String]) = udf((empID:Int) => lookupMap.get(empID)) – u6765 Jan 27 '22 at 09:10
1

As you are saying you already have Dataframes then its pretty easy follow these steps:

1)create a sqlcontext

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

2) Create Temporary tables for all 3 Eg:

EmployeeDataframe.createOrReplaceTempView("EmpTable")

3) Query using MySQL Queries

val MatchingDetails = sqlContext.sql("SELECT DISTINCT E.EmpID, DeptName FROM EmpTable E inner join DeptTable G on " +
  "E.EmpID=g.EmpID")
toofrellik
  • 1,277
  • 4
  • 15
  • 39
  • Thanks for the response. As I mentioned in my post, I don't want to use JOIN. – Prasan Dec 22 '16 at 04:15
  • why cant you use join when it is serving the purpose – toofrellik Dec 22 '16 at 04:17
  • This is jus an example that I posted. In my real time scenario, we are trying to convert a complex Informatica mapping to SPARK. I am trying to replicate the look up feature (jus like informatica) in SPARK. – Prasan Dec 22 '16 at 04:18
  • 2
    @Prasan Informatica is an old product. When migrating from it, don't try to preserve/replicate "the old ways". `lookup` in spark is very slow, while `join` in its different flavors (inner, outer, left-outer, ...) is very optimized. For the example you used, `join` is the way to go in Spark. If you have other usecase, post it with enough context on a different question. – maasg Dec 22 '16 at 11:42
  • I have already implemented using join. But I want to explore using the look up concept as well (jus to learn and see the difference in implementation and performance) – Prasan Dec 22 '16 at 11:57
1

Starting with some "lookup" data, there are two approaches:

Method #1 -- using a lookup DataFrame

// use a DataFrame (via a join)
val lookupDF = sc.parallelize(Seq(
  ("banana",   "yellow"),
  ("apple",    "red"),
  ("grape",    "purple"),
  ("blueberry","blue")
)).toDF("SomeKeys","SomeValues")

Method #2 -- using a map in a UDF

// turn the above DataFrame into a map which a UDF uses
val Keys = lookupDF.select("SomeKeys").collect().map(_(0).toString).toList
val Values = lookupDF.select("SomeValues").collect().map(_(0).toString).toList
val KeyValueMap = Keys.zip(Values).toMap

def ThingToColor(key: String): String = {
  if (key == null) return ""
  val firstword = key.split(" ")(0) // fragile!
  val result: String = KeyValueMap.getOrElse(firstword,"not found!")
  return (result)
}

val ThingToColorUDF = udf( ThingToColor(_: String): String )

Take a sample data frame of things that will be looked up:

val thingsDF = sc.parallelize(Seq(
  ("blueberry muffin"),
  ("grape nuts"),
  ("apple pie"),
  ("rutabaga pudding")
)).toDF("SomeThings")

Method #1 is to join on the lookup DataFrame

Here, the rlike is doing the matching. And null appears where that does not work. Both columns of the lookup DataFrame get added.

val result_1_DF = thingsDF.join(lookupDF, expr("SomeThings rlike SomeKeys"), 
                     "left_outer")

2 columns are added and nulls where applicable

Method #2 is to add a column using the UDF

Here, only 1 column is added. And the UDF can return a non-Null value. However, if the lookup data is very large it may fail to "serialize" as required to send to the workers in the cluster.

val result_2_DF = thingsDF.withColumn("AddValues",ThingToColorUDF($"SomeThings"))

Which gives you:

1 column is added with values provided by the UDF

In my case I had some lookup data that was over 1 million values, so Method #1 was my only choice.

warrens
  • 1,661
  • 18
  • 16
  • thanks warrens how does this "val KeyValueMap = Keys.zip(Values).toMap" guarantees only "correct" value is mapped to "correct" key? – BdEngineer Jul 20 '20 at 05:16
  • any clue on how to handle this in map in UDF https://stackoverflow.com/questions/63935600/writing-udf-for-looks-up-in-the-map-in-java-giving-unsupported-literal-type-clas – BdEngineer Sep 17 '20 at 10:37