0

I have number of dataframes that created inside a loop and I want to union all these dataframes. I tried to create final dataframe' that should contains all other smalldataframes, but it seams this not working because the union will hold only the last smalldataframes`. I read this similar question and the answer that provided by @zero323 the solution that has been suggested works fine when I do it in shell:

scala> val a= sql("""select "1" as k""")
a: org.apache.spark.sql.DataFrame = [k: string]

scala> val b= sql("""select "2" as k""")
b: org.apache.spark.sql.DataFrame = [k: string]

scala> val c= sql("""select "3" as k""")
c: org.apache.spark.sql.DataFrame = [k: string]

scala> a.show
+---+
|  k|
+---+
|  1|
+---+


scala> b.show
+---+
|  k|
+---+
|  2|
+---+


scala> c.show
+---+
|  k|
+---+
|  3|
+---+

Now to join the above three dataframes I did the following:

scala> val g = Seq(a,b,c)
g: Seq[org.apache.spark.sql.DataFrame] = List([k: string], [k: string], [k: string])

scala> val s = g.reduce(_ union _)
s: org.apache.spark.sql.DataFrame = [k: string]

scala> s.show
+---+
|  k|
+---+
|  1|
|  2|
|  3|
+---+

The problem

Now I am trying to do same thing on Eclipse

val g = Seq()

    val dummyDf = ss.sql(s"select 0 as ss , a.* from table1 limit 1")
    for (element <- 0 to arr.size-1) {
      var strt: Int = arr.toList(element )
      var nd: Int = arr.toList(element + 1)
      val tempDF = ss.sql(s"select $strt as ss , a.* from table1 a where rnk between $strt+1 and $nd-1")
      g :+ tempDF
    }
val finalDf = g.reduce(_ union _)

but I got the following error message:

Multiple markers at this line:

◾missing parameter type for expanded function ((x$14: , x$15) ⇒ x$14.union(x$15))

◾identifier expected but '_' found.

◾missing parameter type for expanded function ((x$14: , x$15: ) ⇒ x$14.union(x$15))

Any help with this is highly appreciated

Edit:

For the other solution that suggested in the link that I referred:

dfs match {
  case h :: Nil => Some(h)
  case h :: _   => Some(h.sqlContext.createDataFrame(
                     h.sqlContext.sparkContext.union(dfs.map(_.rdd)),
                     h.schema
                   ))
  case Nil  => None
}

where can I find the resulted union final dataframe? I ran it and the compilation went correctly, but I can not access the resulted dataframe

Rahman
  • 101
  • 1
  • 12

2 Answers2

2

where can I find the resulted union final dataframe? I ran it and the compilation went correctly, but I can not access the resulted dataframe

Answer :

This is the way to execute... finaluniondf is Option[DataFrame] you have to use .get since its an Option

  package examples

    import org.apache.log4j.Level
    import org.apache.spark.sql.{DataFrame, SparkSession}

    object DFUnion extends App {
      val logger = org.apache.log4j.Logger.getLogger("org")
      logger.setLevel(Level.WARN)
      val spark = SparkSession.builder()
        .appName(this.getClass.getName)
        .config("spark.master", "local[*]").getOrCreate()

      import spark.implicits._
      import spark.sql
      val a= sql("""select "1" as k""")
      val b= sql("""select "2" as k""")
      val c= sql("""select "3" as k""")
      val dfs = Seq(a,b,c)
     val finaluniondf: Option[DataFrame] =  dfs match {
        case h :: Nil => Some(h)
        case h :: _   => Some(h.sqlContext.createDataFrame(
          h.sqlContext.sparkContext.union(dfs.map(_.rdd)),
          h.schema
        ))
        case Nil  => None
      }
      println("my final union df is ")
      finaluniondf.get.show
    }

Result :

my final union df is 
+---+
|  k|
+---+
|  1|
|  2|
|  3|
+---+
Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
  • Hi @Ram. It work very well. in the shell. The problem still in the Eclipse it though `value sqlContext is not a member of Nothing` error. BTW I edit my question and added the Eclipse code. – Rahman Jul 29 '19 at 18:43
  • simple classpath or import issues have you tried that in intellij all needed dependencies are present in the classpath of eclipse ? all imports were done correctly as show above ? – Ram Ghadiyaram Jul 29 '19 at 18:44
  • try this in the first line of your program to identify any issues related to classpath `val cl = ClassLoader.getSystemClassLoader cl.asInstanceOf[java.net.URLClassLoader].getURLs.foreach(println)` – Ram Ghadiyaram Jul 29 '19 at 18:50
  • No did not try it in Intellij. Also I did try add the mentioned two lines of code but nothing changed – Rahman Jul 29 '19 at 19:06
  • above 2 lines will print the classpath to understand what jars are missing – Ram Ghadiyaram Jul 29 '19 at 19:50
  • So the program has an error now(the one that mentioned above), do you mean compile it with error? If so where I expect to see the name of missed jars(if any). – Rahman Jul 29 '19 at 20:26
  • " has an error now " what s the error. you need to import java.net.URLClassLoader – Ram Ghadiyaram Jul 29 '19 at 21:28
  • Hello Ram, thanks for help. It complain on `case h :: _ => Some(h.sqlContext.createDataFrame( h.sqlContext.sparkContext.union(dfs.map(_.rdd))` the error message is `value sqlContext is not a member of Nothing` . However, I am wondering what is the wrong with the other method `reduce(_ union _)` – Rahman Jul 30 '19 at 14:59
  • some where you assingned to wrong type thats why is not a member of Nothing – Ram Ghadiyaram Jul 30 '19 at 17:57
1

The issue was with the

val g = Seq()

I need to define the type of this Seq, otherwise Scala will give me Nothing as a type ( and that was the issue).

For that I did the following

val tempDF = ss.sql(s"select 0 as ss , a.* from table1 a where rnk between $strt+1 and $nd-1") val g = Seq(tempDF)

By this the g will be Seq[DataFrame]

Thanks

Rahman
  • 101
  • 1
  • 12