1

I'm trying to convert a PySpark dataframe column from string format to date format, I've consulted quite a few other questions and answers, I'll show every line of code I've attempted as all commented out and all imports so far, all of those attempts are from resolved questions where it did the trick for someone else with the same issue. My output keeps showing up with null values.

Update 1: so my issue is apparently I needed M/dd/yyyy, but now there's a new error, it only works for the first 5 entries because right afterwards there's a different format: Fail to parse '4/2/2012' in the new parser, which would need M/d/yyyy. There isn't a way to just encompass M/d/yyyy and MM/dd/yyyy and all possible combinations?

Update 2: After changing the date formats in Excel to add leading zeroes to all dates, so if there's just one digit it now contains a leading zero to give it two digit positions and standardize all dates, for some reason PySpark can't process it again. Fail to parse '03/23/12'

Update 3: I'm now trying more complex functions and loops to parse the date strings, here's new code using mapping and rdd, but converting the rdd back into the dataframe doesn't work, toDF() can't be used on an rdd:

#makes the rdd parse date strings
def cleanDates(date_time_str):
      date_time_obj = datetime.datetime.strptime(date_time_str, '%m/%d/%Y')
      fields = date_time_obj.split('/')
      month = fields[0]
      day = fields[1]
      year = fields[2]
      return month, day, year

amazonDates = amazon_train.select("Date")
amazonDates.show()

rdd_amazon = amazonDates.rdd.map(lambda x: cleanDates(x))
#rdd_amazon.collect() #didn't work
#rdd_amazon.sc.parallelize() #didn't work
type(rdd_amazon) #pipeline rdd
sc.toDF(rdd_amazon) #sc = spark context


#rdd_amazon = amazon_mapping.toDF() #didn't work

#rdd_amazon = rdd_amazon.flatMap(lambda x: cleanDates(x))

#amazon_train = amazon_train.withColumn('Date', rdd_amazon) #didn't work

#amazon_train = amazon_train.withColumn("Date", to_date('Date', "MM/dd/yyyy")) #didn't work
#amazon_train.show() #didn't work
#amazon_train.printSchema() #didn't work
#amazon_train.groupBy(year("date").alias('Year')).agg({'Close': 'mean'}).show() #didn't work

These questions didn't work:

How to change the column type from String to Date in DataFrames?

Why I get null results from date_format() PySpark function?

Here's the scheme and first 5 rows:

amazon_train.printSchema()
amazon_train.show(5)
root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)

+---------+----------+----------+----------+----------+----------+-------+
|     Date|      Open|      High|       Low|     Close| Adj Close| Volume|
+---------+----------+----------+----------+----------+----------+-------+
|3/23/2012|192.009995|196.199997|191.800003|195.039993|195.039993|5984000|
|3/26/2012|196.479996|202.970001|     195.5|202.869995|202.869995|7613700|
|3/27/2012|203.589996|209.850006|202.880005|205.440002|205.440002|9600800|
|3/28/2012|206.139999|     207.0|200.309998|201.160004|201.160004|6245000|
|3/29/2012|201.279999|205.309998|200.630005|204.610001|204.610001|5711200|
+---------+----------+----------+----------+----------+----------+-------+
only showing top 5 rows

I tried the following commented out lines and I was expecting to convert the 'Date' column into official date datatype, either by adding a new column or just directly converting it, doesn't really matter:

#None of these work:

amazon_train.select(col("Date"),to_date(col("Date"),"MM-dd-yyyy").alias("date")).show()
#mazon_train.select(to_date(amazon_train.Date, 'yyyy-MM-dd HH:mm:ss').alias('date')).collect()
amazon_train.withColumn("New Date",expr("to_date(Date, yyyy-MM-dd)")).show()
amazon_train.withColumn(amazon_train.select(to_timestamp("Date", 'yyyy-MM-dd'))).alias('New Date').show()
amazon_train.select("Date").show()
amazon_train.Date = to_timestamp(amazon_train.Date, 'yyyy-MM-dd').alias('New Date').collect()
amazon_train = amazon_train.withColumn('New Date', to_date(unix_timestamp(col('Date'), 'MM-dd-yyyy').cast("timestamp")))
amazon_train = amazon_train.withColumn('col_with_date_format',sf.to_date(amazon_train.Date))
amazon_train = amazon_train.withColumn("Date", amazon_train["Date"].cast(DateType()))
amazon_train.select(date_format('Date', 'MM-dd-yyy').alias('newFormat')).show()
amazon_train.select(date_format(unix_timestamp("Date", "MM-dd-yyyy").cast("timestamp"), "MM-dd-yyyy")).show()
amazon_train.withColumn('New Date', F.date_format(F.to_date('Date', "MM/dd/yyyy"),'MM-dd-yyyy')).show()
F.date_format(F.to_date(amazon_train["Date"], "MM/dd/yyyy"), "MM-dd-yyyy")
amazon_train["Date"].cast(DateType())
amazon_train = amazon_train.withColumn("New Dates", date_format(to_date(col("Date"),"MM/dd/yyyy"),"MM-dd-yyyy"))
date_format(to_date(amazon_train.Date,"MM/dd/yyyy"),"MM/dd/yyyy")

Main issue, it keeps saying null:

+----+----------+----------+----------+----------+----------+-------+
|date|      Open|      High|       Low|     Close| Adj Close| Volume|
+----+----------+----------+----------+----------+----------+-------+
|null|192.009995|196.199997|191.800003|195.039993|195.039993|5984000|
|null|196.479996|202.970001|     195.5|202.869995|202.869995|7613700|
|null|203.589996|209.850006|202.880005|205.440002|205.440002|9600800|
|null|206.139999|     207.0|200.309998|201.160004|201.160004|6245000|
|null|201.279999|205.309998|200.630005|204.610001|204.610001|5711200|
+----+----------+----------+----------+----------+----------+-------+
only showing top 5 rows

root
 |-- date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)

1 Answers1

0

Alright so buckle up buckaroos, this one gets complicated

Steps 1 & 2: Alright first make the spark context for PySpark and add SQL Context, get your data into a dataframe etc.

#All imports
from pyspark.sql import SparkSession
from datetime import datetime
import dateparser
from pyspark.sql import Row, SQLContext
import functools
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window
from pyspark.sql.functions import concat, lit, col
from pyspark.sql.functions import year, month, dayofmonth
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import col, unix_timestamp, to_date
from pyspark.sql import functions as F

Step 3: Then we select the dates column into a new view

df_dates = df.select("Date")

Step 4: Then we put the column into a list with collect() using an rdd mapping function with lambda to apply the datetime.strptime() function to each record of column 'Date'

df_list = df_dates.rdd.map(lambda x: (datetime.strptime(x['Date'], "%m/%d/%Y"))).collect()

Step 5: Then we make a SQLContext with our spark context (sc) and use a for loop to put the list back inside a new dataframe

sqlContext = SQLContext(sc)
new_df = sqlContext.createDataFrame([(item,) for item in df_list], ['Datetimes'])

Step 6: Last we select that dataframe's only column and add row indices and merge the new dataframe with the original starting dataframe, or you can just replace that first date's column instead of adding it (this includes dropping the row indices once they're merged, those are just to index them and combine them)

just_dates_col = new_df.select('Datetimes')

# add row_index because no common columns, df = original dataframe
df2 = just_dates_col.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
df1 = df.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))

df2 = df2.join(df1, on=["row_index"]).drop("row_index")