0

I have four data frames in Spark Scala (Spark version: 2.3 and Spark-sql: 2.11 and Scala version: 2.11.0) such as:

ratingsDf

+-------+---+
|ratings| id|
+-------+---+
|      0|  1|
|      1|  2|
|      1|  3|
|      0|  4|
|      0|  5|
|      1|  6|
|      1|  7|
|      1|  8|
|      0|  9|
|      1| 10|
+-------+---+

GpredictionsDf

+-----------+---+
|gprediction| id|
+-----------+---+
|          0|  1|
|          1|  2|
|          1|  3|
|          1|  4|
|          1|  5|
|          1|  6|
|          1|  7|
|          1|  8|
|          0|  9|
|          1| 10|
+-----------+---+

RpredictionsDf

+-----------+---+
|rprediction| id|
+-----------+---+
|          0|  1|
|          1|  2|
|          1|  3|
|          1|  4|
|          1|  5|
|          1|  6|
|          1|  7|
|          1|  8|
|          1|  9|
|          1| 10|
+-----------+---+

LpredictionsDf

+-----------+---+
|lprediction| id|
+-----------+---+
|          0|  1|
|          1|  2|
|          1|  3|
|          0|  4|
|          1|  5|
|          1|  6|
|          1|  7|
|          1|  8|
|          0|  9|
|          1| 10|
+-----------+---+

I need to create a DataFrame by joining all four tables on "id" column. I tried below two ways to do this:

**Method 1: **

val ensembleDf = GpredictionsDf.join(rpredjoin, gpredjoin("id") === RpredictionsDf("id"))
                               .join(LpredictionsDf, LpredictionsDf("id") === RpredictionsDf("id"))
                               .join(ratingsDf, ratingsDf("id") === RpredictionsDf("id"))
                               .select("gprediction", "rprediction", "lprediction", "ratings")

**Method 2: **

ratingsDf.createOrReplaceTempView("ratingjoin");
GpredictionsDf.createOrReplaceTempView("gpredjoin")
RpredictionsDf.createOrReplaceTempView("rpredjoin")
LpredictionsDf.createOrReplaceTempView("lpredjoin")    


val ensembleDf = sqlContext.sql("SELECT gprediction, rprediction, lprediction, ratings FROM gpredjoin, rpredjoin, lpredjoin, ratingjoin WHERE " +
  "gpredjoin.id = rpredjoin.id AND rpredjoin.id = lpredjoin.id AND lpredjoin.id = ratingjoin.id");

However, in both cases my join failes and returns empty

ensembleDf.show();

+-----------+-----------+-----------+-------+
|gprediction|rprediction|lprediction|ratings|
+-----------+-----------+-----------+-------+
+-----------+-----------+-----------+-------+

Any idea why this could be happening? What code changes do I need to do to get this fixed?

Nick
  • 313
  • 1
  • 3
  • 21
  • Could you please follow the instructions from [How to make good reproducible Apache Spark Dataframe examples](https://stackoverflow.com/q/48427185/10465355) and include reproducible data and Spark version? Thanks. – 10465355 Nov 25 '18 at 23:24
  • I have updated it accordingly – Nick Nov 25 '18 at 23:43
  • All of these including rpredjoin and gpredjoin are dataframes only. There are no hive tables here – Nick Nov 26 '18 at 00:59
  • Your joins in Method 1 look correct except that temp views were being mixed with dataframes. Replacing `GpredictionsDf.join(rpredjoin, gpredjoin("id") === RpredictionsDf("id"))` with `GpredictionsDf.join(RpredictionsDf, GpredictionsDf("id") === RpredictionsDf("id"))` should fix the problem. – Leo C Nov 26 '18 at 01:46
  • I added val ensemble = GpredictionsDf.join(RpredictionsDf, GpredictionsDf("id") === RpredictionsDf("id")) .join(LpredictionsDf, LpredictionsDf("id") === RpredictionsDf("id")) .join(ratingsDf, ratingsDf("id") === RpredictionsDf("id")) .select("gprediction", "rprediction", "lprediction", "ratings"); It still shows empty dataset – Nick Nov 26 '18 at 03:01

1 Answers1

0
scala> val ratingsDf = Seq((0,1),(1,2),(1,3),(0,4),(0,5),(1,6),(1,7),(1,8),(0,9),(1,10)).toDF("ratings","id")

scala> val GpredictionsDf = Seq((0,1),(1,2),(1,3),(1,4),(1,5),(1,6),(1,7),(1,8),(0,9),(1,10)).toDF("gprediction", "id")

scala> val RpredictionsDf = Seq((0,1),(1,2),(1,3),(1,4),(1,5),(1,6),(1,7),(1,8),(1,9),(1,10)).toDF("rprediction", "id")

scala> val LpredictionsDf = Seq((0,1),(1,2),(1,3),(0,4),(1,5),(1,6),(1,7),(1,8),(0,9),(1,10)).toDF("lprediction", "id")

scala> val ensembleDf = GpredictionsDf.join(RpredictionsDf, GpredictionsDf("id") === RpredictionsDf("id") ).join(LpredictionsDf, LpredictionsDf("id") === RpredictionsDf("id")).join(ratingsDf, ratingsDf("id") === RpredictionsDf("id")).select("gprediction", "rprediction", "lprediction", "ratings")

scala> ensembleDf.show
    +-----------+-----------+-----------+-------+
    |gprediction|rprediction|lprediction|ratings|
    +-----------+-----------+-----------+-------+
    |          0|          0|          0|      0|
    |          1|          1|          1|      1|
    |          1|          1|          1|      1|
    |          1|          1|          0|      0|
    |          1|          1|          1|      0|
    |          1|          1|          1|      1|
    |          1|          1|          1|      1|
    |          1|          1|          1|      1|
    |          0|          1|          0|      0|
    |          1|          1|          1|      1|
    +-----------+-----------+-----------+-------+

This is what I tried and it is giving the correct values. I would recommend you to check the DFs you are using for joining.

Sathiyan S
  • 1,013
  • 6
  • 13