3

I want to pass dataframe which has set of values to new query but it fails.

1) Here I am selecting particular column so that I can pass under ISIN in next query

scala> val managerIdDf=finalEmployeesDf.filter($"manager_id"!==0).select($"manager_id").distinct
managerIdDf: org.apache.spark.sql.DataFrame = [manager_id: bigint]

2) My sample data:

 scala> managerIdDf.show
    +----------+                                                                    
    |manager_id|
    +----------+
    |     67832|
    |     65646|
    |      5646|
    |     67858|
    |     69062|
    |     68319|
    |     66928|
    +----------+

3) When I execute final query it fails:

scala> finalEmployeesDf.filter($"emp_id".isin(managerIdDf)).select("*").show
java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.DataFrame [manager_id: bigint]  

I also tried converting to List and Seq but it generates an error only. Like below when I try to convert to Seq and re run the query it throws an error:

scala> val seqDf=managerIdDf.collect.toSeq
seqDf: Seq[org.apache.spark.sql.Row] = WrappedArray([67832], [65646], [5646], [67858], [69062], [68319], [66928])

scala> finalEmployeesDf.filter($"emp_id".isin(seqDf)).select("*").show
java.lang.RuntimeException: Unsupported literal type class scala.collection.mutable.WrappedArray$ofRef WrappedArray([67832], [65646], [5646], [67858], [69062], [68319], [66928])

I also referred this post but in vain. This type of query I am trying it for solving subqueries in spark dataframe. Anyone here pls ?

RushHour
  • 494
  • 6
  • 25

2 Answers2

3

An alternative approach using the dataframes and tempviews and free format SQL of SPARK SQL - don't worry about the logic, it's just convention and an alternative to your initial approach - that should equally suffice:

val df2 = Seq(
  ("Peter", "Doe", Seq(("New York", "A000000"), ("Warsaw", null))),
  ("Bob", "Smith", Seq(("Berlin", null))),
  ("John", "Jones", Seq(("Paris", null)))
).toDF("firstname", "lastname", "cities")

df2.createOrReplaceTempView("persons")

val res = spark.sql("""select * 
                         from persons 
                        where firstname
                       not in (select firstname
                                 from persons
                                where lastname <> 'Doe')""")

res.show

or

val list = List("Bob", "Daisy", "Peter")

val res2 = spark.sql("select firstname, lastname from persons")
                .filter($"firstname".isin(list:_*))

res2.show

or

val query = s"select * from persons where firstname in (${list.map ( x => "'" + x + "'").mkString(",") })"
val res3 = spark.sql(query)
res3.show

or

df2.filter($"firstname".isin(list: _*)).show

or

val list2 = df2.select($"firstname").rdd.map(r => r(0).asInstanceOf[String]).collect.toList
df2.filter($"firstname".isin(list2: _*)).show 

In your case specifically:

val seqDf=managerIdDf.rdd.map(r => r(0).asInstanceOf[Long]).collect.toList 2) 
finalEmployeesDf.filter($"emp_id".isin(seqDf: _)).select("").show
thebluephantom
  • 16,458
  • 8
  • 40
  • 83
2

Yes, you cannot pass a DataFrame in isin. isin requires some values that it will filter against.

If you want an example, you can check my answer here

As per question update, you can make the following change,

.isin(seqDf) 

to

.isin(seqDf: _*)
Chitral Verma
  • 2,695
  • 1
  • 17
  • 29
  • Its little bit complex bro..I have modified my question. Hope it will help you to answer me – RushHour Oct 02 '18 at 18:00
  • Okay will try and let u know – RushHour Oct 03 '18 at 04:33
  • Check my code and error finalEmployeesDf.filter($"emp_id".isin(seqDf: _*)).select("*").show java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema [67832] where 67832 is one of the value in sequence – RushHour Oct 03 '18 at 06:59
  • Even I tried with converting it to List values as well but same error – RushHour Oct 03 '18 at 07:01