79

I want to change names of two columns using spark withColumnRenamed function. Of course, I can write:

data = sqlContext.createDataFrame([(1,2), (3,4)], ['x1', 'x2'])
data = (data
       .withColumnRenamed('x1','x3')
       .withColumnRenamed('x2', 'x4'))

but I want to do this in one step (having list/tuple of new names). Unfortunately, neither this:

data = data.withColumnRenamed(['x1', 'x2'], ['x3', 'x4'])

nor this:

data = data.withColumnRenamed(('x1', 'x2'), ('x3', 'x4'))

is working. Is it possible to do this that way?

Mark Rotteveel
  • 100,966
  • 191
  • 140
  • 197
user2280549
  • 1,204
  • 2
  • 12
  • 19
  • 4
    The accepted answer is efficient, but watch out for the other answers that suggest calling `withColumnRenamed` multiple times. The `withColumnRenamed` approach should be avoided for [reasons outlined in this blog post](https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015). See my answer for more detail. – Powers Jul 19 '20 at 22:14

12 Answers12

123

It is not possible to use a single withColumnRenamed call.

  • You can use DataFrame.toDF method*

    data.toDF('x3', 'x4')
    

    or

    new_names = ['x3', 'x4']
    data.toDF(*new_names)
    
  • It is also possible to rename with simple select:

    from pyspark.sql.functions import col
    
    mapping = dict(zip(['x1', 'x2'], ['x3', 'x4']))
    data.select([col(c).alias(mapping.get(c, c)) for c in data.columns])
    

Similarly in Scala you can:

  • Rename all columns:

    val newNames = Seq("x3", "x4")
    
    data.toDF(newNames: _*)
    
  • Rename from mapping with select:

    val  mapping = Map("x1" -> "x3", "x2" -> "x4")
    
    df.select(
      df.columns.map(c => df(c).alias(mapping.get(c).getOrElse(c))): _*
    )
    

    or foldLeft + withColumnRenamed

    mapping.foldLeft(data){
      case (data, (oldName, newName)) => data.withColumnRenamed(oldName, newName) 
    }
    

* Not to be confused with RDD.toDF which is not a variadic functions, and takes column names as a list,

zero323
  • 322,348
  • 103
  • 959
  • 935
  • In your 3rd example `data.select([col(c).alias(mapping.get(c, c)) for c in data.columns])`: how would you write `data.columns` if you do method chaining (so something like col but referencing the dataframe? – corianne1234 Apr 28 '20 at 13:49
  • Using `df.select` is the right way to do it in spark (scala/python). Check this out: https://stackoverflow.com/a/62728542/8551891 – Krunal Patel May 18 '21 at 15:39
30

I couldn't find an easy pyspark solution either, so just built my own one, similar to pandas' df.rename(columns={'old_name_1':'new_name_1', 'old_name_2':'new_name_2'}).

import pyspark.sql.functions as F

def rename_columns(df, columns):
    if isinstance(columns, dict):
        return df.select(*[F.col(col_name).alias(columns.get(col_name, col_name)) for col_name in df.columns])
    else:
        raise ValueError("'columns' should be a dict, like {'old_name_1':'new_name_1', 'old_name_2':'new_name_2'}")

So your solution will look like data = rename_columns(data, {'x1': 'x3', 'x2': 'x4'})

If you want to chain your method calls, Spark 3.0 brought in pyspark.sql.DataFrame.transform, which you can use in the following way:

my_df.transform(lambda df: rename_columns(df, {'old_name_1':'new_name_1', 'old_name_2':'new_name_2'}))

It saves me some lines of code, hope it will help you too.

proggeo
  • 609
  • 5
  • 11
  • 2
    Updated this response 3 years after, with a more efficient solution, after I had to fix performance of sooo many jobs using .withColumnRenamed() in a for loop. Sorry for the delay. – proggeo Oct 08 '21 at 09:01
18

why do you want to perform it in a single line if you print the execution plan it is actually done in single line only

data = spark.createDataFrame([(1,2), (3,4)], ['x1', 'x2'])
data = (data
   .withColumnRenamed('x1','x3')
   .withColumnRenamed('x2', 'x4'))
data.explain()

OUTPUT

== Physical Plan ==
*(1) Project [x1#1548L AS x3#1552L, x2#1549L AS x4#1555L]
+- Scan ExistingRDD[x1#1548L,x2#1549L]

if you want to do it with a tuple of list you can use a simple map function

data = spark.createDataFrame([(1,2), (3,4)], ['x1', 'x2'])
new_names = [("x1","x3"),("x2","x4")]
data = data.select(list(
       map(lambda old,new:F.col(old).alias(new),*zip(*new_names))
       ))

data.explain()

still has same plan

OUTPUT

== Physical Plan ==
*(1) Project [x1#1650L AS x3#1654L, x2#1651L AS x4#1655L]
+- Scan ExistingRDD[x1#1650L,x2#1651L]
Tushar Kolhe
  • 7,375
  • 1
  • 7
  • 13
10

You can also use Dictionary to iterate through the columns you want to rename.

Sample

a_dict = {'sum_gb': 'sum_mbUsed', 'number_call': 'sum_call_date'}

for key, value in a_dict.items():
    df= df.withColumnRenamed(value,key)
Foxbat
  • 183
  • 2
  • 11
9

This should work if you want to rename multiple columns using the same column name with a prefix

df.select([f.col(c).alias(PREFIX + c) for c in df.columns])
datamonk3y
  • 124
  • 1
  • 4
  • 1
    You wrote `for c in columns` so not `df.columns` this gives me an error but it would be so useful if it worked (for method chaining). How did you make this work? – corianne1234 Apr 28 '20 at 14:12
  • 1
    @corianne1234 For chaining and changing the column names use `transform` `df.transform(lambda df2: df2.select([col(acol).alias(acol + '_tmp') for acol in df2.columns]))` – pettinato Oct 02 '20 at 18:20
6

I have this hack in all of my pyspark program:

import pyspark
def rename_sdf(df, mapper={}, **kwargs_mapper):
    ''' Rename column names of a dataframe
        mapper: a dict mapping from the old column names to new names
        Usage:
            df.rename({'old_col_name': 'new_col_name', 'old_col_name2': 'new_col_name2'})
            df.rename(old_col_name=new_col_name)
    '''
    for before, after in mapper.items():
        df = df.withColumnRenamed(before, after)
    for before, after in kwargs_mapper.items():
        df = df.withColumnRenamed(before, after)
    return df
pyspark.sql.dataframe.DataFrame.rename = rename_sdf

Now you can easily rename any spark dataframe in the pandas way!

df.rename({'old1':'new1', 'old2':'new2'})
Louis Yang
  • 3,511
  • 1
  • 25
  • 24
3

The accepted answer by zero323 is efficient. Most of the other answers should be avoided.

Here's another efficient solution that leverages the quinn library and is well suited for production codebases:

df = spark.createDataFrame([(1,2), (3,4)], ['x1', 'x2'])
def rename_col(s):
    mapping = {'x1': 'x3', 'x2': 'x4'}
    return mapping[s]
actual_df = df.transform(quinn.with_columns_renamed(rename_col))
actual_df.show()

Here's the DataFrame that's outputted:

+---+---+
| x3| x4|
+---+---+
|  1|  2|
|  3|  4|
+---+---+

Let's take a look at the logical plans that are outputted with actual_df.explain(True) and verify they're efficient:

== Parsed Logical Plan ==
'Project ['x1 AS x3#52, 'x2 AS x4#53]
+- LogicalRDD [x1#48L, x2#49L], false

== Analyzed Logical Plan ==
x3: bigint, x4: bigint
Project [x1#48L AS x3#52L, x2#49L AS x4#53L]
+- LogicalRDD [x1#48L, x2#49L], false

== Optimized Logical Plan ==
Project [x1#48L AS x3#52L, x2#49L AS x4#53L]
+- LogicalRDD [x1#48L, x2#49L], false

== Physical Plan ==
*(1) Project [x1#48L AS x3#52L, x2#49L AS x4#53L]

The parsed logical plan and physical plan are basically equal, so Catalyst isn't doing any heavy lifting to optimize the plan.

Calling withColumnRenamed multiple times should be avoided because it creates an inefficient parsed plan that needs to be optimized.

Let's look at an unnecessarily complex parsed plan:

def rename_columns(df, columns):
    for old_name, new_name in columns.items():
        df = df.withColumnRenamed(old_name, new_name)
    return df

def rename_col(s):
    mapping = {'x1': 'x3', 'x2': 'x4'}
    return mapping[s]
actual_df = rename_columns(df, {'x1': 'x3', 'x2': 'x4'})
actual_df.explain(True)
== Parsed Logical Plan ==
Project [x3#52L, x2#49L AS x4#55L]
+- Project [x1#48L AS x3#52L, x2#49L]
   +- LogicalRDD [x1#48L, x2#49L], false

== Analyzed Logical Plan ==
x3: bigint, x4: bigint
Project [x3#52L, x2#49L AS x4#55L]
+- Project [x1#48L AS x3#52L, x2#49L]
   +- LogicalRDD [x1#48L, x2#49L], false

== Optimized Logical Plan ==
Project [x1#48L AS x3#52L, x2#49L AS x4#55L]
+- LogicalRDD [x1#48L, x2#49L], false

== Physical Plan ==
*(1) Project [x1#48L AS x3#52L, x2#49L AS x4#55L]
Machavity
  • 30,841
  • 27
  • 92
  • 100
Powers
  • 18,150
  • 10
  • 103
  • 108
3

Since pyspark 3.4.0, you can use the withColumnsRenamed() method to rename multiple columns at once. It takes as an input a map of existing column names and the corresponding desired column names.

df = df.withColumnsRenamed({
    "x1": "x3",
    "x2": "x4"
})

The method renames both columns at the same time. Note that if a column (e.g., "x1") doesn't exist in current dataframe schema, no error is thrown. Instead, it is simply ignored.

Daniele Cappuccio
  • 1,952
  • 2
  • 16
  • 31
1

You should use this function:

def spark_rename_from_dict(df, rename_dict):
    newcols = [rename_dict.get(i,i) for i in df.columns]
    df = df.toDF(*newcols)

Here your rename dict is a mapping over a subset of df.columns. This approach is recommended since it does not create multiple dataframes

figs_and_nuts
  • 4,870
  • 2
  • 31
  • 56
0

Easiest way to do this is as follows:

Explanation:

  1. Get all columns in the pyspark dataframe using df.columns
  2. Create a list looping through each column from step 1
  3. The list will output:col("col1").alias("col1_x").Do this only for the required columns
  4. *[list] will unpack the list for select statement in pypsark

from pyspark.sql import functions as F (df .select(*[F.col(c).alias(f"{c}_x") for c in df.columns]) .toPandas().head() )

Hope this helps

Naveenan
  • 345
  • 1
  • 4
  • 16
0
data.pandas_api().rename(columns=dict(x1="x3",x2="x4"))

out:

+---+---+
| x3| x4|
+---+---+
|  1|  2|
|  3|  4|
+---+---+
G.G
  • 639
  • 1
  • 5
0

If you want change the name with de same text without convert to pandas, you can do this:

columns = df.columns
new_columns = {}
for n in range(len(columns)-1):
    new_columns.update(dict(zip([columns[n],columns[n+1]],[columns[n]+'_new',columns[n+1]+'_new'])))
df_rename = df.select([F.col(c).alias(new_columns.get(c, c)) for c in df.columns])