46

I'm using Spark 1.3.0 and Python. I have a dataframe and I wish to add an additional column which is derived from other columns. Like this,

>>old_df.columns
[col_1, col_2, ..., col_m]

>>new_df.columns
[col_1, col_2, ..., col_m, col_n]

where

col_n = col_3 - col_4

How do I do this in PySpark?

zero323
  • 322,348
  • 103
  • 959
  • 935
oikonomiyaki
  • 7,691
  • 15
  • 62
  • 101

3 Answers3

63

One way to achieve that is to use withColumn method:

old_df = sqlContext.createDataFrame(sc.parallelize(
    [(0, 1), (1, 3), (2, 5)]), ('col_1', 'col_2'))

new_df = old_df.withColumn('col_n', old_df.col_1 - old_df.col_2)

Alternatively you can use SQL on a registered table:

old_df.registerTempTable('old_df')
new_df = sqlContext.sql('SELECT *, col_1 - col_2 AS col_n FROM old_df')
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Hey @zero323, what if I want to create a column i.e. Col_1 is string and col_2 is string and I want column_n as join of col_1 and Col_2. i.e. Col_1 is zero and column_2 is 323. Column_n should be zero323 ? – Jason Feb 22 '16 at 19:59
  • Thanks @zero323. Although I have this question: df.select(concat(col("k"), lit(" "), col("v"))) How can I create a third column here ? – Jason Feb 22 '16 at 20:09
  • df.withColumn ( 'Datetime', df.column1 + '' + df.column2 ) doesn't work – Jason Feb 22 '16 at 20:15
  • And it won't. You already have all the puzzles, you just have put these in the right place. – zero323 Feb 22 '16 at 20:18
  • thanks. Got it to work using df = df.select("*", concat(col("col1"), lit(" "), col("col2")).alias('coln')) – Jason Feb 22 '16 at 20:32
12

Additionally, we can use udf

from pyspark.sql.functions import udf,col
from pyspark.sql.types import IntegerType
from pyspark import SparkContext
from pyspark.sql import SQLContext

sc = SparkContext()
sqlContext = SQLContext(sc)
old_df = sqlContext.createDataFrame(sc.parallelize(
    [(0, 1), (1, 3), (2, 5)]), ('col_1', 'col_2'))
function = udf(lambda col1, col2 : col1-col2, IntegerType())
new_df = old_df.withColumn('col_n',function(col('col_1'), col('col_2')))
new_df.show()
arker296
  • 404
  • 5
  • 10
11

You have the following possibilities to add a new column:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([[1, 2], [3, 4]], ['col1', 'col2'])
df.show()

+----+----+
|col1|col2|
+----+----+
|   1|   2|
|   3|   4|
+----+----+

-- Using the method withColumn:

import pyspark.sql.functions as F

df.withColumn('col3', F.col('col2') - F.col('col1')) # col function

df.withColumn('col3', df['col2'] - df['col1']) # bracket notation

df.withColumn('col3', df.col2 - df.col1) # dot notation

-- Using the method select:

df.select('*', (F.col('col2') - F.col('col1')).alias('col3'))

The expression '*' returns all columns.

-- Using the method selectExpr:

df.selectExpr('*', 'col2 - col1 as col3')

-- Using SQL:

df.createOrReplaceTempView('df_view')

spark.sql('select *, col2 - col1 as col3 from df_view')

Result:

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2|   1|
|   3|   4|   1|
+----+----+----+
Mykola Zotko
  • 15,583
  • 3
  • 71
  • 73