3

I have two DataFrames (Spark 2.2.0 and Scala 2.11.8). The first DataFrame df1 has one column called col1, and the second one df2 has also 1 column called col2. The number of rows is equal in both DataFrames.

How can I merge these two columns into a new DataFrame?

I tried join, but I think that there should be some other way to do it.

Also, I tried to apply withColumm, but it does not compile.

val result = df1.withColumn(col("col2"), df2.col1)

UPDATE:

For example:

df1 = 
col1
1
2
3

df2 = 
col2
4
5
6

result = 
col1  col2
1     4
2     5
3     6
Markus
  • 3,562
  • 12
  • 48
  • 85

3 Answers3

5

If that there's no actual relationship between these two columns, it sounds like you need the union operator, which will return, well, just the union of these two dataframes:

var df1 = Seq("a", "b", "c").toDF("one")
var df2 = Seq("d", "e", "f").toDF("two")

df1.union(df2).show

+---+ 
|one| 
+---+ 
| a | 
| b | 
| c | 
| d | 
| e | 
| f | 
+---+

[edit] Now you've made clear that you just want two columns, then with DataFrames you can use the trick of adding a row index with the function monotonically_increasing_id() and joining on that index value:

import org.apache.spark.sql.functions.monotonically_increasing_id

var df1 = Seq("a", "b", "c").toDF("one")
var df2 = Seq("d", "e", "f").toDF("two")

df1.withColumn("id", monotonically_increasing_id())
    .join(df2.withColumn("id", monotonically_increasing_id()), Seq("id"))
    .drop("id")
    .show

+---+---+ 
|one|two|
+---+---+ 
| a | d | 
| b | e | 
| c | f |
+---+---+
Chondrops
  • 728
  • 1
  • 4
  • 14
  • I need the columns to be next to each other. So, I need to get 2 columns, not one. – Markus Nov 25 '17 at 20:45
  • Should I import `monotonically_increasing_id`? – Markus Nov 26 '17 at 11:42
  • Oh, yes, you'll need to import that – Chondrops Nov 26 '17 at 11:59
  • Could you please add the `import` statement to you answer? I cannot find the `import` path for `monotonically_increasing_id`. – Markus Nov 26 '17 at 12:20
  • Added the import. Please remember to accept the answer if you end up using the solution! – Chondrops Nov 26 '17 at 12:28
  • Pretty sure this will fail if the number and sizes of the partitions are different. See [the documentation](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$@monotonically_increasing_id():org.apache.spark.sql.Column). – Joe Pallas Dec 16 '17 at 20:03
1

Depends in what you want to do.

If you want to merge two DataFrame you should use the join. There are the same join's types has in relational algebra (or any DBMS)

You are saying that your Data Frames just had one column each.

In that case you might want todo a cross join (cartesian product) with give you a two columns table of all possible combination of col1 and col2, or you might want the uniao (as referred by @Chondrops) witch give you a one column table with all elements.

I think all other join's types uses can be done specialized operations in spark (in this case two Data Frames one column each).

marc_s
  • 732,580
  • 175
  • 1,330
  • 1,459
1

As far as I know, the only way to do want you want with DataFrames is by adding an index column using RDD.zipWithIndex to each and then doing a join on the index column. Code for doing zipWithIndex on a DataFrame can be found in this SO answer.

But, if the DataFrames are small, it would be much simpler to collect the two DFs in the driver, zip them together, and make the result into a new DataFrame.

[Update with example of in-driver collect/zip]

val df3 = spark.createDataFrame(df1.collect() zip df2.collect()).withColumnRenamed("_1", "col1").withColumnRenamed("_2", "col2")
Joe Pallas
  • 2,105
  • 1
  • 14
  • 17