54

Is it possible and what would be the most efficient neat method to add a column to Data Frame?

More specifically, column may serve as Row IDs for the existing Data Frame.

In a simplified case, reading from file and not tokenizing it, I can think of something as below (in Scala), but it completes with errors (at line 3), and anyways doesn't look like the best route possible:

var dataDF = sc.textFile("path/file").toDF() 
val rowDF = sc.parallelize(1 to DataDF.count().toInt).toDF("ID") 
dataDF = dataDF.withColumn("ID", rowDF("ID")) 
Oleg Shirokikh
  • 3,447
  • 4
  • 33
  • 61
  • What are the errors? That seems like the right method in the API. – Chet Apr 28 '15 at 18:41
  • 1
    @Chet, `withColumn` is supposed to work with the same dataframe - i.e. you can do something like `dataDF = dataDF.withColumn("ID", dataDF("ID").map(...))` with the column of `this` dataframe only, not with others – Oleg Shirokikh Apr 28 '15 at 22:00
  • @OlegShirokikh ah. huh. it'd be nice if that restriction were documented. Interesting problem then. the only mechanism I can see from the API otherwise would be to use a `join`. Not a bad candidate to suggest for an API enhancement. – Chet Apr 28 '15 at 22:23
  • @Chet - right, it's one of the core funcitonalities for any data frame. For example, they have some built-in functionality to update the schema of Parquet files. Obviously, it's very expensive operation in a distributed environement, but anyways IMO it should be there – Oleg Shirokikh Apr 28 '15 at 22:48
  • Did you try to use an UDF? (something like ``sqlContext.udf().register("...`` – Thomas Decaux Oct 19 '16 at 12:20

4 Answers4

53

It's been a while since I posted the question and it seems that some other people would like to get an answer as well. Below is what I found.

So the original task was to append a column with row identificators (basically, a sequence 1 to numRows) to any given data frame, so the rows order/presence can be tracked (e.g. when you sample). This can be achieved by something along these lines:

sqlContext.textFile(file).
zipWithIndex().
map(case(d, i)=>i.toString + delimiter + d).
map(_.split(delimiter)).
map(s=>Row.fromSeq(s.toSeq))

Regarding the general case of appending any column to any data frame:

The "closest" to this functionality in Spark API are withColumn and withColumnRenamed. According to Scala docs, the former Returns a new DataFrame by adding a column. In my opinion, this is a bit confusing and incomplete definition. Both of these functions can operate on this data frame only, i.e. given two data frames df1 and df2 with column col:

val df = df1.withColumn("newCol", df1("col") + 1) // -- OK
val df = df1.withColumn("newCol", df2("col") + 1) // -- FAIL

So unless you can manage to transform a column in an existing dataframe to the shape you need, you can't use withColumn or withColumnRenamed for appending arbitrary columns (standalone or other data frames).

As it was commented above, the workaround solution may be to use a join - this would be pretty messy, although possible - attaching the unique keys like above with zipWithIndex to both data frames or columns might work. Although efficiency is ...

It's clear that appending a column to the data frame is not an easy functionality for distributed environment and there may not be very efficient, neat method for that at all. But I think that it's still very important to have this core functionality available, even with performance warnings.

Oleg Shirokikh
  • 3,447
  • 4
  • 33
  • 61
  • 1
    There is some recent initiative to support row_number function (among others) - [SPARK-7712](https://issues.apache.org/jira/browse/SPARK-7712) The JIRA ticket does not mention it explicitly, so you might want to look into the [related pull-request](https://github.com/apache/spark/pull/6278/files#diff-5dade93a3cce85de2ceeab02bd9d4ff6R531) – rchukh May 21 '15 at 12:46
  • 2
    Good answer! The appending column functionality could be optimized if Spark known that I am joining on the *sorted* key. That would be a big performance boosting. – WeiChing 林煒清 Jul 22 '15 at 11:33
  • 2
    If you are looking to append an id column you should look at the function monotonically_increasing_id() which can be used inside of withColumn. – Michael Armbrust Jan 20 '16 at 21:41
  • 1
    monotonically_increasing_id() has deep fragility issues and you must be very careful if you use it http://stackoverflow.com/questions/35705038/how-do-i-add-an-persistent-column-of-row-ids-to-spark-dataframe/35706321 – Paul May 05 '16 at 10:49
30

not sure if it works in spark 1.3 but in spark 1.5 I use withColumn:

import sqlContext.implicits._
import org.apache.spark.sql.functions._


df.withColumn("newName",lit("newValue"))

I use this when I need to use a value that is not related to existing columns of the dataframe

This is similar to @NehaM's answer but simpler

Tal Joffe
  • 5,347
  • 4
  • 25
  • 31
6

I took help from above answer. However, I find it incomplete if we want to change a DataFrame and current APIs are little different in Spark 1.6. zipWithIndex() returns a Tuple of (Row, Long) which contains each row and corresponding index. We can use it to create new Row according to our need.

val rdd = df.rdd.zipWithIndex()
             .map(indexedRow => Row.fromSeq(indexedRow._2.toString +: indexedRow._1.toSeq))
val newstructure = StructType(Seq(StructField("Row number", StringType, true)).++(df.schema.fields))
sqlContext.createDataFrame(rdd, newstructure ).show

I hope this will be helpful.

NehaM
  • 1,272
  • 1
  • 18
  • 32
4

You can use row_number with Window function as below to get the distinct id for each rows in a dataframe.

df.withColumn("ID", row_number() over Window.orderBy("any column name in the dataframe"))

You can also use monotonically_increasing_id for the same as

df.withColumn("ID", monotonically_increasing_id())

And there are some other ways too.

Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97