95

Looking at the new spark DataFrame API, it is unclear whether it is possible to modify dataframe columns.

How would I go about changing a value in row x column y of a dataframe?

In pandas this would be:

df.ix[x,y] = new_value

Edit: Consolidating what was said below, you can't modify the existing dataframe as it is immutable, but you can return a new dataframe with the desired modifications.

If you just want to replace a value in a column based on a condition, like np.where:

from pyspark.sql import functions as F

update_func = (F.when(F.col('update_col') == replace_val, new_value)
                .otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)

If you want to perform some operation on a column and create a new column that is added to the dataframe:

import pyspark.sql.functions as F
import pyspark.sql.types as T

def my_func(col):
    do stuff to column here
    return transformed_value

# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())

df = df.withColumn('new_column_name', my_udf('update_col'))

If you want the new column to have the same name as the old column, you could add the additional step:

df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
blackbishop
  • 30,945
  • 11
  • 55
  • 76
Luke
  • 6,699
  • 13
  • 50
  • 88
  • if you want to access the DataFrame by index, you need to build an index first. See, e.g. http://stackoverflow.com/questions/26828815/how-to-get-elemnt-by-index-in-spark-rdd-java. Or add an index column with your own index. – fanfabbb Mar 31 '15 at 09:38

5 Answers5

80

While you cannot modify a column as such, you may operate on a column and return a new DataFrame reflecting that change. For that you'd first create a UserDefinedFunction implementing the operation to apply and then selectively apply that function to the targeted column only. In Python:

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])

new_df now has the same schema as old_df (assuming that old_df.target_column was of type StringType as well) but all values in column target_column will be new_value.

karlson
  • 5,325
  • 3
  • 30
  • 62
  • 1
    this is an actual answer to the problem thanks! yet, the spark jobs don't finish for me, all executors get los. can you think of an alternative way? I use it with a bit more complex UDF where I do transformation to strings. There is no pandas-similar Syntax like new_df = old_df.col1.apply(lambda x: func(x))? – fanfabbb Mar 31 '15 at 07:58
  • 25
    there is also: `new_df = old_df.withColumn('target_column', udf(df.name))` – fanfabbb Mar 31 '15 at 13:13
  • @fanfabbb Would you mind posting your UDF? I don't see why a complex string operation would result in lost executors. Maybe decreasing partition sizes might help, e.g. by increasing the number of partitions. – karlson Mar 31 '15 at 14:44
  • Can the lambda function be replaced with a normal Python function: (`def(x): bla bla return x`) in this line: `udf = UserDefinedFunction(lambda x: 'new_value', Stringtype())`? Somehow I couldn't get `udf` to work with a normally defined function. – Jason May 16 '15 at 22:05
  • 2
    Yes, that should work fine. Keep in mind that UDFs can only take columns as parameters. If you want to pass other data into the function you have to partially apply it first. – karlson May 18 '15 at 06:16
  • thanks @karlson for your answer. Do you know if there is a way to specify new_value to be a variable? I am trying to copy the value from column 'true_val' over to 'target_column'. Both columns exist in the same dataframe. – Katya Willard Sep 10 '15 at 18:44
  • 1
    @KatyaHandler If you just want to duplicate a column, one way to do so would be to simply select it twice: `df.select([df[col], df[col].alias('same_column')])`, where `col` is the name of the column you want to duplicate. With the latest Spark release, a lot of the stuff I've used UDFs for can be done with the functions defined in `pyspark.sql.functions`. UDF performance in Pyspark is really poor, so that might really be worth looking into: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions – karlson Sep 23 '15 at 19:48
  • @karlson, thanks. Yes my solution ended up working out of the bag with the function `withColumn()`. I found UDF performance to be too slow for my dataset (~ 1 terabyte). – Katya Willard Sep 23 '15 at 20:44
  • 1
    it is `StringType` not `Stringtype` in `udf = UserDefinedFunction(lambda x: 'new_value', Stringtype())` – Namit Juneja Feb 16 '17 at 05:31
  • Is it possible instead of selectively apply that function to the targeted column only, to apply it only to the non-null values of this column? I tried filling nulls with .fillna() but still I am getting back TypeError: Nonetype has no len(). – Swan87 Jun 13 '17 at 12:27
  • @Swan87 Why not implement the function such that it is a noop if the argument is `None`, e.g. `lambda x: len(x) if x is not None else x`? – karlson Jun 13 '17 at 12:53
59

Commonly when updating a column, we want to map an old value to a new value. Here's a way to do that in pyspark without UDF's:

# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
    F.when(df[update_col]==old_value,new_value).
    otherwise(df[update_col])).
Paul
  • 3,321
  • 1
  • 33
  • 42
15

DataFrames are based on RDDs. RDDs are immutable structures and do not allow updating elements on-site. To change values, you will need to create a new DataFrame by transforming the original one either using the SQL-like DSL or RDD operations like map.

A highly recommended slide deck: Introducing DataFrames in Spark for Large Scale Data Science.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
maasg
  • 37,100
  • 11
  • 88
  • 115
  • 3
    What exactly is the dataframe abstraction adding then that couldn't already be done in the same amount of lines with a table? – Luke Mar 17 '15 at 22:25
  • " DataFrames introduce new simplified operators for filtering, aggregating, and projecting over large datasets. Internally, DataFrames leverage the Spark SQL logical optimizer to intelligently plan the physical execution of operations to work well on large datasets" - https://databricks.com/blog/2015/03/13/announcing-spark-1-3.html – maasg Mar 17 '15 at 22:40
13

Just as maasg says you can create a new DataFrame from the result of a map applied to the old DataFrame. An example for a given DataFrame df with two rows:

val newDf = sqlContext.createDataFrame(df.map(row => 
  Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)

Note that if the types of the columns change, you need to give it a correct schema instead of df.schema. Check out the api of org.apache.spark.sql.Row for available methods: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html

[Update] Or using UDFs in Scala:

import org.apache.spark.sql.functions._

val toLong = udf[Long, String] (_.toLong)

val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")

and if the column name needs to stay the same you can rename it back:

modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")
Community
  • 1
  • 1
radek1st
  • 1,617
  • 17
  • 19
5

importing col, when from pyspark.sql.functions and updating fifth column to integer(0,1,2) based on the string(string a, string b, string c) into a new DataFrame.

from pyspark.sql.functions import col, when 

data_frame_temp = data_frame.withColumn("col_5",when(col("col_5") == "string a", 0).when(col("col_5") == "string b", 1).otherwise(2))
DHEERAJ
  • 571
  • 6
  • 9