16

Q: Is there is any way to merge two dataframes or copy a column of a dataframe to another in PySpark?

For example, I have two Dataframes:

DF1              
C1                    C2                                                        
23397414             20875.7353   
5213970              20497.5582   
41323308             20935.7956   
123276113            18884.0477   
76456078             18389.9269 

the seconde dataframe

DF2
C3                       C4
2008-02-04               262.00                 
2008-02-05               257.25                 
2008-02-06               262.75                 
2008-02-07               237.00                 
2008-02-08               231.00 

Then i want to add C3 of DF2 to DF1 like this:

New DF              
    C1                    C2          C3                                              
    23397414             20875.7353   2008-02-04
    5213970              20497.5582   2008-02-05
    41323308             20935.7956   2008-02-06
    123276113            18884.0477   2008-02-07
    76456078             18389.9269   2008-02-08

I hope this example was clear.

zero323
  • 322,348
  • 103
  • 959
  • 935
MrGildarts
  • 833
  • 1
  • 10
  • 25

10 Answers10

16

rownum + window function i.e solution 1 or zipWithIndex.map i.e solution 2 should help in this case.

Solution 1 : You can use window functions to get this kind of

Then I would suggest you to add rownumber as additional column name to Dataframe say df1.

  DF1              
    C1                    C2                 columnindex                                             
    23397414             20875.7353            1
    5213970              20497.5582            2
    41323308             20935.7956            3
    123276113            18884.0477            4
    76456078             18389.9269            5

the second dataframe

DF2
C3                       C4             columnindex
2008-02-04               262.00            1        
2008-02-05               257.25            2      
2008-02-06               262.75            3      
2008-02-07               237.00            4          
2008-02-08               231.00            5

Now .. do inner join of df1 and df2 that's all... you will get below ouput

something like this

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

w = Window().orderBy()

df1 = ....  // as showed above df1

df2 = ....  // as shown above df2


df11 =  df1.withColumn("columnindex", rowNumber().over(w))
  df22 =  df2.withColumn("columnindex", rowNumber().over(w))

newDF = df11.join(df22, df11.columnindex == df22.columnindex, 'inner').drop(df22.columnindex)
newDF.show()



New DF              
    C1                    C2          C3                                              
    23397414             20875.7353   2008-02-04
    5213970              20497.5582   2008-02-05
    41323308             20935.7956   2008-02-06
    123276113            18884.0477   2008-02-07
    76456078             18389.9269   2008-02-08

Solution 2 : Another good way(probably this is best :)) in scala, which you can translate to pyspark :

/**
* Add Column Index to dataframe 
*/
def addColumnIndex(df: DataFrame) = sqlContext.createDataFrame(
  // Add Column index
  df.rdd.zipWithIndex.map{case (row, columnindex) => Row.fromSeq(row.toSeq :+ columnindex)},
  // Create schema
  StructType(df.schema.fields :+ StructField("columnindex", LongType, false))
)

// Add index now...
val df1WithIndex = addColumnIndex(df1)
val df2WithIndex = addColumnIndex(df2)

 // Now time to join ...
val newone = df1WithIndex
  .join(df2WithIndex , Seq("columnindex"))
  .drop("columnindex")
Community
  • 1
  • 1
Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
10

I thought I would share the python (pyspark) translation for answer #2 above from @Ram Ghadiyaram:

from pyspark.sql.functions import col
def addColumnIndex(df): 
  # Create new column names
  oldColumns = df.schema.names
  newColumns = oldColumns + ["columnindex"]

  # Add Column index
  df_indexed = df.rdd.zipWithIndex().map(lambda (row, columnindex): \
                                         row + (columnindex,)).toDF()

  #Rename all the columns
  new_df = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], 
                  newColumns[idx]), xrange(len(oldColumns)), df_indexed)   
  return new_df

# Add index now...
df1WithIndex = addColumnIndex(df1)
df2WithIndex = addColumnIndex(df2)

#Now time to join ...
newone = df1WithIndex.join(df2WithIndex, col("columnindex"),
                           'inner').drop("columnindex")
Jed
  • 1,823
  • 4
  • 20
  • 52
8

for python3 version,

from pyspark.sql.types import StructType, StructField, LongType

def with_column_index(sdf): 
    new_schema = StructType(sdf.schema.fields + [StructField("ColumnIndex", LongType(), False),])
    return sdf.rdd.zipWithIndex().map(lambda row: row[0] + (row[1],)).toDF(schema=new_schema)

df1_ci = with_column_index(df1)
df2_ci = with_column_index(df2)
join_on_index = df1_ci.join(df2_ci, df1_ci.ColumnIndex == df2_ci.ColumnIndex, 'inner').drop("ColumnIndex")
Dyno Fu
  • 8,753
  • 4
  • 39
  • 64
4

I referred to his(@Jed) answer

from pyspark.sql.functions import col
def addColumnIndex(df): 
    # Get old columns names and add a column "columnindex"
    oldColumns = df.columns
    newColumns = oldColumns + ["columnindex"]

    # Add Column index
    df_indexed = df.rdd.zipWithIndex().map(lambda (row, columnindex): \
                                         row + (columnindex,)).toDF()
    #Rename all the columns
    oldColumns = df_indexed.columns  
    new_df = reduce(lambda data, idx:data.withColumnRenamed(oldColumns[idx], 
                  newColumns[idx]), xrange(len(oldColumns)), df_indexed)   
    return new_df

# Add index now...
df1WithIndex = addColumnIndex(df1)
df2WithIndex = addColumnIndex(df2)

#Now time to join ...
newone = df1WithIndex.join(df2WithIndex, col("columnindex"),
                           'inner').drop("columnindex")
Zilong
  • 41
  • 3
  • 1
    What is the difference from Jed's answer? If there is a difference, it should be explained, if not, this should not be posted as an answer.. – Melkor.cz Oct 07 '20 at 14:08
4

This answer solved it for me:

import pyspark.sql.functions as sparkf

# This will return a new DF with all the columns + id
res = df.withColumn('id', sparkf.monotonically_increasing_id())

Credit to Arkadi T

Gustavo
  • 668
  • 13
  • 24
  • 4
    This isn't going to work for joining two dataframes. The monotonically_increasing_id function doesn't return consecutive numbers. No guarantee the two dataframes will assign the same integer to rows in each df – justin cress Sep 12 '19 at 14:12
  • I don't agree that the index could be different in the two dataframes. Please have a look at my code below – Galuoises May 06 '20 at 18:22
  • @justincress that is true, to guarantee the same id one should add `.coalesce(1)` before using the `monotonically_increasing_id` – Gustavo Feb 05 '21 at 16:13
1

Here is an simple example that can help you even if you have already solve the issue.

  //create First Dataframe
  val df1 = spark.sparkContext.parallelize(Seq(1,2,1)).toDF("lavel1")

  //create second Dataframe
  val df2 = spark.sparkContext.parallelize(Seq((1.0, 12.1), (12.1, 1.3), (1.1, 0.3))). toDF("f1", "f2")

  //Combine both dataframe
  val combinedRow = df1.rdd.zip(df2.rdd). map({
    //convert both dataframe to Seq and join them and return as a row
    case (df1Data, df2Data) => Row.fromSeq(df1Data.toSeq ++ df2Data.toSeq)
  })
//  create new Schema from both the dataframe's schema
  val combinedschema =  StructType(df1.schema.fields ++ df2.schema.fields)

//  Create a new dataframe from new row and new schema
  val finalDF = spark.sqlContext.createDataFrame(combinedRow, combinedschema)

  finalDF.show
koiralo
  • 22,594
  • 6
  • 51
  • 72
  • Note : `Caused by: org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition` – user3190018 Nov 19 '18 at 22:35
1

To merge columns from two different dataframe you have first to create a column index and then join the two dataframes. Indeed, two dataframes are similar to two SQL tables. To make a connection you have to join them.

If you don't care about the final order of the rows you can generate the index column with monotonically_increasing_id().

Using the following code you can check that monotonically_increasing_id generates the same index column in both dataframes (at least up to a billion of rows), so you won't have any error in the merged dataframe.

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

sample_size = 1E9

sdf1 = spark.range(1, sample_size).select(F.col("id").alias("id1"))
sdf2 = spark.range(1, sample_size).select(F.col("id").alias("id2"))

sdf1 = sdf1.withColumn("idx", sf.monotonically_increasing_id())
sdf2 = sdf2.withColumn("idx", sf.monotonically_increasing_id())

sdf3 = sdf1.join(sdf2, 'idx', 'inner')
sdf3 = sdf3.withColumn("diff", F.col("id1")-F.col("id2")).select("diff")
sdf3.filter(F.col("diff") != 0 ).show()
Galuoises
  • 2,630
  • 24
  • 30
0

Expanding on Jed's answer, in response to Ajinkya's comment:

To get the same old column names, you need to replace "old_cols" with a column list of the newly named indexed columns. See my modified version of the function below

def add_column_index(df):
    new_cols = df.schema.names + ['ix']
    ix_df = df.rdd.zipWithIndex().map(lambda (row, ix): row + (ix,)).toDF()
    tmp_cols = ix_df.schema.names
    return reduce(lambda data, idx: data.withColumnRenamed(tmp_cols[idx], new_cols[idx]), xrange(len(tmp_cols)), ix_df)
Mogsdad
  • 44,709
  • 21
  • 151
  • 275
MNav
  • 23
  • 3
0

Not the better way performance wise.

df3=df1.crossJoin(df2).show(3)
PIG
  • 599
  • 3
  • 13
-1

You can use a combination of monotonically_increasing_id (guaranteed to always be increasing) and row_number (guaranteed to always give the same sequence). You cannot use row_number alone because it needs to be ordered by something. So here we order by monotonically_increasing_id. I am using Spark 2.3.1 and Python 2.7.13.

from pandas import DataFrame
from pyspark.sql.functions import (
    monotonically_increasing_id,
    row_number)
from pyspark.sql import Window

DF1 = spark.createDataFrame(DataFrame({
    'C1': [23397414, 5213970, 41323308, 123276113, 76456078],
    'C2': [20875.7353, 20497.5582, 20935.7956, 18884.0477, 18389.9269]}))

DF2 = spark.createDataFrame(DataFrame({
'C3':['2008-02-04', '2008-02-05', '2008-02-06', '2008-02-07', '2008-02-08']}))

DF1_idx = (
    DF1
    .withColumn('id', monotonically_increasing_id())
    .withColumn('columnindex', row_number().over(Window.orderBy('id')))
    .select('columnindex', 'C1', 'C2'))

DF2_idx = (
    DF2
    .withColumn('id', monotonically_increasing_id())
    .withColumn('columnindex', row_number().over(Window.orderBy('id')))
    .select('columnindex', 'C3'))

DF_complete = (
    DF1_idx
    .join(
        other=DF2_idx,
        on=['columnindex'],
        how='inner')
    .select('C1', 'C2', 'C3'))

DF_complete.show()

+---------+----------+----------+
|       C1|        C2|        C3|
+---------+----------+----------+
| 23397414|20875.7353|2008-02-04|
|  5213970|20497.5582|2008-02-05|
| 41323308|20935.7956|2008-02-06|
|123276113|18884.0477|2008-02-07|
| 76456078|18389.9269|2008-02-08|
+---------+----------+----------+
user2739472
  • 1,401
  • 17
  • 15
  • 2
    Would one of the downvoters like to give some constructive criticism here? I do not see why this is a bad answer. – user2739472 Dec 02 '19 at 13:48
  • 1
    It wasn't me, but unfortunately, this method does not guarantee that the row sequence will be the same in the two DataFrames. I tried it with a 10-row DataFrame and the results were scrambled. Shame, because I can't use rdd methods (the rdd API is not whitelisted on the cluster I am using). – AltShift Nov 05 '20 at 04:53