1

I have two dataframes with different columns and one of the dataframes has the row indexes as follows:

+------------+--------------+
|     rec_id1|       rec_id2|
+------------+--------------+
|rec-3301-org|rec-3301-dup-0|
|rec-2994-org|rec-2994-dup-0|
|rec-2106-org|rec-2106-dup-0|
|rec-3771-org|rec-3771-dup-0|
|rec-3886-org|rec-3886-dup-0|
| rec-974-org| rec-974-dup-0|
| rec-224-org| rec-224-dup-0|
|rec-1826-org|rec-1826-dup-0|
| rec-331-org| rec-331-dup-0|
|rec-4433-org|rec-4433-dup-0|
+------------+--------------+

+----------+-------+-------------+------+-----+-------+
|given_name|surname|date_of_birth|suburb|state|address|
+----------+-------+-------------+------+-----+-------+
|         0|    1.0|            1|     1|    1|    1.0|
|         0|    1.0|            0|     1|    1|    1.0|
|         0|    1.0|            1|     1|    1|    0.0|
|         0|    1.0|            1|     1|    1|    1.0|
|         0|    1.0|            1|     1|    1|    1.0|
|         0|    1.0|            1|     1|    1|    1.0|
|         0|    1.0|            1|     1|    1|    1.0|
|         0|    1.0|            0|     1|    1|    1.0|
|         0|    1.0|            1|     1|    1|    1.0|
|         0|    1.0|            1|     0|    1|    1.0|
+----------+-------+-------------+------+-----+-------+

I would like to merge the two pyspark dataframes into one such that the new dataframe is like this:

                             given_name  surname   ...     state  address
rec_id_1     rec_id_2                              ...                   
rec-3301-org rec-3301-dup-0           0      1.0   ...         1      1.0
rec-2994-org rec-2994-dup-0           0      1.0   ...         1      1.0
rec-2106-org rec-2106-dup-0           0      1.0   ...         1      0.0

Assume same number of rows.

Taiwotman
  • 885
  • 14
  • 27

1 Answers1

4

If it is the same number of rows, you can create a temporary column for each dataframe, which contains a generated ID and join the two dataframes on this column. The example has two dataframes with identical values in each column but the column names differ. So the combined result should contain 8 columns with the corresponding values.

test_df = spark.createDataFrame([
    (1,"2",5,1),(3,"4",7,8),(10,"11",12,13),                
    ], ("col1","col2","col3","col4"))

test_df2 = spark.createDataFrame([
    (1,"2",5,1),(3,"4",7,8),(10,"11",12,13),              
    ], ("col5","col6","col7","col8"))

test_df = test_df.withColumn("id", monotonically_increasing_id())
test_df2 = test_df2.withColumn("id", monotonically_increasing_id())

test_df.join(test_df2, "id", "inner").drop("id").show()

Result:

+----+----+----+----+----+----+----+----+
|col1|col2|col3|col4|col5|col6|col7|col8|
+----+----+----+----+----+----+----+----+
|   1|   2|   5|   1|   1|   2|   5|   1|
|   3|   4|   7|   8|   3|   4|   7|   8|
|  10|  11|  12|  13|  10|  11|  12|  13|
+----+----+----+----+----+----+----+----+
gaw
  • 1,960
  • 2
  • 14
  • 18
  • 2
    AFAIK, the ids are based on the number of partitions. So if the two DataFrames have a different number of partitions, this won't be guaranteed to work. – pault Sep 14 '18 at 13:23
  • 1
    It worked! Note that is it is vital to add `from pyspark.sql import functions as F` in order to use the function `monotonically_increasing_id()` i.e `F.monotonically_increasing_id()` – Taiwotman Sep 14 '18 at 14:00
  • What if the rows are not same? Not sure it will work either. – Taiwotman Sep 14 '18 at 14:07
  • 1
    I would guess if you have two dataframes with n and m rows and join them like this you would receive the concatenated dataframe with n rows if n – gaw Sep 15 '18 at 18:12
  • @pault, I have this problem, the two dataframes have a different number of partitions, and the result of merging is wrong. Do you have any idea to fix this problem? – gaussclb May 19 '19 at 10:40
  • @gaussclb hard to say without seeing [an example](https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-examples) (Feel free to post a new question if you want). Ideally you want to join on a primary key- relying on the "order" of rows in the file [*can* be done](https://stackoverflow.com/questions/52318016/pyspark-add-sequential-and-deterministic-index-to-dataframe), but it's slow and not really what spark is designed to do. – pault May 20 '19 at 15:13