1

I have below 2 datasets of different schema.

case class schema1(a: Double, b: Double) -> dataset1
case class schema2(c: Double, d: Double, e: Double, f: Double) -> dataset2

I want to create another dataset with below schema:

case class schema3(c: Double,  b: Double) -> dataset3

i.e schema3 dataset contains 1st column-c from schema 2 dataset and 2nd column-b from schema 1 dataset.

how do I create a 3rd dataset based on schema3 by utilizing the data from columns c and b from dataset 2 and 1.

or in more simple words, I have to create a 3rd dataset by taking one column from 1st dataset and another column from 2nd dataset and mapping it to the 3rd schema defined above.

Please help.

zubug55
  • 729
  • 7
  • 27

1 Answers1

1

Use monotonically_increasing_id & row_numer to add unique id values in both datasets & join two datasets using id column along with required columns from both datasets, finally drop id from result dataset.

Please check below code.

scala> case class schema1(a: Double, b: Double)
defined class schema1

scala> case class schema2(c: Double, d: Double, e: Double, f: Double)
defined class schema2

scala> import org.apache.spark.sql.expressions._
import org.apache.spark.sql.expressions._

scala> val sa = Seq(schema1(11,12),schema1(22,23)).toDF.withColumn("id",monotonically_increasing_id).withColumn("id",row_number().over(Window.orderBy("id")))
sa: org.apache.spark.sql.DataFrame = [a: double, b: double ... 1 more field]

scala> val sb = Seq(schema2(22,23,24,25),schema2(32,33,34,35),schema2(132,133,134,135)).toDF.withColumn("id",monotonically_increasing_id).withColumn("id",row_number().over(Window.orderBy("id")))
sb: org.apache.spark.sql.DataFrame = [c: double, d: double ... 3 more fields]

scala> sa.show(false)
+----+----+---+
|a   |b   |id |
+----+----+---+
|11.0|12.0|0  |
|22.0|23.0|1  |
+----+----+---+


scala> sb.show(false)
+-----+-----+-----+-----+---+
|c    |d    |e    |f    |id |
+-----+-----+-----+-----+---+
|22.0 |23.0 |24.0 |25.0 |0  |
|32.0 |33.0 |34.0 |35.0 |1  |
|132.0|133.0|134.0|135.0|2  |
+-----+-----+-----+-----+---+

scala> sb.select("c","id").join(sa.select("b","id"),Seq("id"),"full").drop("id").show(false)
+-----+----+
|c    |b   |
+-----+----+
|22.0 |12.0|
|32.0 |23.0|
|132.0|null|
+-----+----+

Srinivas
  • 8,957
  • 2
  • 12
  • 26
  • This won't always work since `monotonically_increasing_id` does not guarantee consecutive numbers, see here for example: https://stackoverflow.com/questions/48209667/using-monotonically-increasing-id-for-assigning-row-number-to-pyspark-datafram – Shaido May 08 '20 at 08:34
  • Thanks for correcting, I have add row_number on top of monotonically_increasing_id. – Srinivas May 08 '20 at 08:42