18

I have a DataFrame that look something like that. I want to operate on the day of the date_time field.

root
 |-- host: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- date_time: timestamp (nullable = true)

I tried to add a column to extract the day. So far my attempts have failed.

df = df.withColumn("day", df.date_time.getField("day"))

org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type TimestampType;

This has also failed

df = df.withColumn("day", df.select("date_time").map(lambda row: row.date_time.day))

AttributeError: 'PipelinedRDD' object has no attribute 'alias'

Any idea how this can be done?

zero323
  • 322,348
  • 103
  • 959
  • 935
Wai Yip Tung
  • 18,106
  • 10
  • 43
  • 47

2 Answers2

33

You can use simple map:

df.rdd.map(lambda row:
    Row(row.__fields__ + ["day"])(row + (row.date_time.day, ))
)

Another option is to register a function and run SQL query:

sqlContext.registerFunction("day", lambda x: x.day)
sqlContext.registerDataFrameAsTable(df, "df")
sqlContext.sql("SELECT *, day(date_time) as day FROM df")

Finally you can define udf like this:

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

day = udf(lambda date_time: date_time.day, IntegerType())
df.withColumn("day", day(df.date_time))

EDIT:

Actually if you use raw SQL day function is already defined (at least in Spark 1.4) so you can omit udf registration. It also provides a number of different date processing functions including:

It is also possible to use simple date expressions like:

current_timestamp() - expr("INTERVAL 1 HOUR")

It mean you can build relatively complex queries without passing data to Python. For example:

df =  sc.parallelize([
    (1, "2016-01-06 00:04:21"),
    (2, "2016-05-01 12:20:00"),
    (3, "2016-08-06 00:04:21")
]).toDF(["id", "ts_"])

now = lit("2016-06-01 00:00:00").cast("timestamp") 
five_months_ago = now - expr("INTERVAL 5 MONTHS")

(df
    # Cast string to timestamp
    # For Spark 1.5 use cast("double").cast("timestamp")
    .withColumn("ts", unix_timestamp("ts_").cast("timestamp"))
    # Find all events in the last five months
    .where(col("ts").between(five_months_ago, now))
    # Find first Sunday after the event
    .withColumn("next_sunday", next_day(col("ts"), "Sun"))
    # Compute difference in days
    .withColumn("diff", datediff(col("ts"), col("next_sunday"))))
zero323
  • 322,348
  • 103
  • 959
  • 935
  • There are many columns and I only want to add one more. The map method may be too cumbersome to list all existing columns. I will try the register function way. thank you. – Wai Yip Tung Jun 23 '15 at 22:59
  • You don't have to list all the existing columns in map. It is possible possible to simply recreate the row. I've updated the answer to reflect that. There are two problems though with this approach. It returns RDD of Rows not DataFrame and it is most likely slower than a optimized SQL. – zero323 Jun 23 '15 at 23:27
  • 1
    Defining udf seems to be to cleanest way I found so far. Added to the answer. – zero323 Jun 25 '15 at 00:28
0

enter image description here

res=df.withColumn("dayofts",dayofmonth("ts_"))
from pyspark.sql import functions as F
res=df.withColumn("dayofts",F.dayofmonth("ts_"))
res.show()
rajashree
  • 11
  • 3