9

I have two DataFrame with same number of row, but number of column is different and dynamic according to source.

First DataFrame contains all columns, but the second DataFrame is filtered and processed which don't have all other.

Need to pick specific column from first DataFrame and add/merge with second DataFrame.

val sourceDf = spark.read.load(parquetFilePath)
val resultDf = spark.read.load(resultFilePath)

val columnName :String="Col1"

I tried to add in several ways, here i am just giving few one....

val modifiedResult = resultDf.withColumn(columnName, sourceDf.col(columnName))

val modifiedResult = resultDf.withColumn(columnName, sourceDf(columnName))
val modifiedResult = resultDf.withColumn(columnName, labelColumnUdf(sourceDf.col(columnName)))

None of these are working.

Can you please help me on this to merge/add column to the 2nd DataFrame from 1st DataFrame.

Given example are not the exact data structure that i need, but it will fulfill my requirement to resolve this issue.

Sample Input Output:

Source DataFrame:
+---+------+---+
|InputGas|
+---+------+---+
|1000|
|2000|
|3000|
|4000|
+---+------+---+

Result DataFrame:
+---+------+---+
| Time|CalcGas|Speed|
+---+------+---+
|  0 | 111| 1111|
|  0 | 222| 2222|
|  1 | 333| 3333|
|  2 | 444| 4444|
+---+------+---+

Expected Output:
+---+------+---+
|Time|CalcGas|Speed|InputGas|
+---+------+---+---+
|  0|111 | 1111 |1000|
|  0|222 | 2222 |2000|
|  1|333 | 3333 |3000|
|  2|444 | 4444 |4000|
+---+------+---+---+
Yousuf Zaman
  • 191
  • 1
  • 5
  • 11

1 Answers1

18

one way to achieve this using join

In case if you have some common column in both the dataframes then you can perform join on that column and get your desire result.

Example:

import sparkSession.sqlContext.implicits._

val df1 = Seq((1, "Anu"),(2, "Suresh"),(3, "Usha"), (4, "Nisha")).toDF("id","name")
val df2 = Seq((1, 23),(2, 24),(3, 24), (4, 25), (5, 30), (6, 32)).toDF("id","age")

val df = df1.as("df1").join(df2.as("df2"), df1("id") === df2("id")).select("df1.id", "df1.name", "df2.age")
df.show()

Output:

+---+------+---+
| id|  name|age|
+---+------+---+
|  1|   Anu| 23|
|  2|Suresh| 24|
|  3|  Usha| 24|
|  4| Nisha| 25|
+---+------+---+

Update:

In case if you don't have any unique id common in both dataframes, then create one and use it.

import sparkSession.sqlContext.implicits._
import org.apache.spark.sql.functions._

var sourceDf = Seq(1000, 2000, 3000, 4000).toDF("InputGas")
var resultDf  = Seq((0, 111, 1111), (0, 222, 2222), (1, 333, 3333), (2, 444, 4444)).toDF("Time", "CalcGas", "Speed")

sourceDf = sourceDf.withColumn("rowId1", monotonically_increasing_id())
resultDf = resultDf.withColumn("rowId2", monotonically_increasing_id())

val df = sourceDf.as("df1").join(resultDf.as("df2"), sourceDf("rowId1") === resultDf("rowId2"), "inner").select("df1.InputGas", "df2.Time", "df2.CalcGas", "df2.Speed")
df.show()

Output:

+--------+----+-------+-----+
|InputGas|Time|CalcGas|Speed|
+--------+----+-------+-----+
|    1000|   0|    111| 1111|
|    2000|   0|    222| 2222|
|    3000|   1|    333| 3333|
|    4000|   2|    444| 4444|
+--------+----+-------+-----+
Prasad Khode
  • 6,602
  • 11
  • 44
  • 59
  • 5
    Note that this will not always work (although it will for small dataframes). `monotonically_increasing_id` only guarantee that the numbers are increasing, it does not guarantee which numbers are used. Hence, the numbers given to the two dataframes can be different. – Shaido Nov 01 '17 at 01:26
  • @Shaido yes totally agreed – Prasad Khode Feb 28 '19 at 05:54
  • In the first example, why is it necessary to do `.as("df1")` if that's already the dataframe's name? – David Feldman Dec 07 '19 at 20:41
  • it is to give alias name to df1 so that I can use this alias name in my `select` operation – Prasad Khode Dec 08 '19 at 10:08
  • https://stackoverflow.com/questions/47894877/spark-monotonically-increasing-id-not-working-as-expected-in-dataframe – Shashank Mar 12 '20 at 00:44