2

I have data like this:

+---+------+                                                                    
| id|   col|
+---+------+
|  1|210927|
|  2|210928|
|  3|210929|
|  4|210930|
|  5|211001|
+---+------+

I want the output like below:

+---+------+----------+
| id|   col|   t_date1|
+---+------+----------+
|  1|210927|27-09-2021|
|  2|210928|28-09-2021|
|  3|210929|29-09-2021|
|  4|210930|30-09-2021|
|  5|211001|01-10-2021|
+---+------+----------+   

Which I was able to get it using pandas and strptime. Below is my code:

pDF= df.toPandas()
valuesList = pDF['col'].to_list()
modifiedList = list()
 
for i in valuesList:
...  modifiedList.append(datetime.strptime(i, "%y%m%d").strftime('%d-%m-%Y'))
 
pDF['t_date1']=modifiedList
 
df = spark.createDataFrame(pDF)

Now, the main problem is I want to avoid using pandas and list since I would be dealing with millions or even billions of data, and pandas slowers the process when it comes to big data.

I tried various methods in spark like unixtime, to_date, timestamp with the format I need but no luck, and since strptime only works with string I can't use it directly on column. I am not willing to create a UDF since they are slow too.

The main problem is with identifying the exact year which I wasn't able to do in spark but I am looking to implement it using spark only. What needs to be changed? Where am I going wrong?

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
whatsinthename
  • 1,828
  • 20
  • 59
  • Have you tried using vectorized method like `pd.to_datetime(df["col"].astype(str), format="%y%m%d")` instead of looping with `datetime.strptime`? – Henry Yik Oct 23 '21 at 18:02
  • I don't want to use pandas since it doesn't work in a distributed way and I have to deal with billions of records. – whatsinthename Oct 23 '21 at 18:43
  • https://databricks.com/blog/2021/10/04/pandas-api-on-upcoming-apache-spark-3-2.html you can use the Pandas API with Spark. I don't really understand your current code stub - you don't call a `spark` method until after you've transformed the data, so the date won't be calculated in parallel. Also you shouldn't call `to_list()` to operate on a Pandas Dataframe - map the column directly instead. – Ben Watson Oct 24 '21 at 07:59
  • Strptime doesn't work on column directly . It has to be a string. – whatsinthename Oct 24 '21 at 08:03
  • Yes and as @HenryYik said, that's why you need to use `pd.to_datetime()`. Look at https://stackoverflow.com/questions/26763344/convert-pandas-column-to-datetime. You should never be moving data out of a Pandas Dataframe. – Ben Watson Oct 24 '21 at 12:31
  • Okay so you mean if I use this method, data will be processed in parallel? Because I am talking about millions or billions of data – whatsinthename Oct 24 '21 at 13:45

3 Answers3

2

Did you use the correct format ? Use yyMMdd and to_date for parsing, dd-MM-yyyy and date_format for formatting should work:

import pyspark.sql.functions as f
df.withColumn('t_date', f.date_format(f.to_date('col', 'yyMMdd'), 'dd-MM-yyyy')).show()

+---+------+----------+
| id|   col|    t_date|
+---+------+----------+
|  1|210927|27-09-2021|
|  2|210928|28-09-2021|
|  3|210929|29-09-2021|
|  4|210930|30-09-2021|
|  5|211001|01-10-2021|
+---+------+----------+

if col is not of string type, cast to string first:

df.withColumn('t_date', f.date_format(f.to_date(f.col('col').cast('string'), 'yyMMdd'), 'dd-MM-yyyy')).show()
Psidom
  • 209,562
  • 33
  • 339
  • 356
  • If I change the value of the first record from `210927` to `920927` so I don't get the output as `27-09-1992`. Instead, I get `27-09-2092`. Using pandas I was able to get this but since it doesn't work in a distributed manner I have to avoid it. – whatsinthename Oct 23 '21 at 18:42
  • If you prefer pandas behavior, you can use `pandas_udf` in pyspark: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.pandas_udf.html – Psidom Oct 23 '21 at 18:44
  • Thanks for your help but I don't want to use UDF as well since they are slower as well. – whatsinthename Oct 23 '21 at 18:45
  • `pandas_udf` is pretty fast based on some of the benchmark here: https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html – Psidom Oct 23 '21 at 18:51
  • But then again, you can always subtract 100 years from the date if it's in the future – Psidom Oct 23 '21 at 18:54
  • How can I implement this in a dynamic manner? – whatsinthename Oct 23 '21 at 18:55
  • Use [`when.otherwise`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.when.html#pyspark.sql.functions.when). Something like `when(year('t_date') > 2021, 't_date' - 100 years).otherwise('t_date')` – Psidom Oct 23 '21 at 19:01
  • Let me try it and can you modify your answer so if it works for me then I will accept it. – whatsinthename Oct 23 '21 at 19:23
  • It is not efficient because if my code runs next year still it will do minus 100 years which will become 1922 – whatsinthename Oct 23 '21 at 19:55
  • There are ways to get the current year. For instance, `today` from datetime module ? – Psidom Oct 23 '21 at 20:22
  • Yeah this might work. Let me try it tomorrow. – whatsinthename Oct 23 '21 at 20:25
  • I was able to subtract the year but wasn't able to retain the complete date. Please help me to retain the date. `df2 = df1.withColumn("t_date1",when(year(to_date(unix_timestamp(col('t_date'), 'dd-MM-yyyy').cast("timestamp"))) > year(current_date()), year(to_date(unix_timestamp(col("t_date"), 'dd-MM-yyyy').cast("timestamp")))-100).otherwise(col("t_date"))) ` – whatsinthename Oct 24 '21 at 13:40
  • I had to do the conversions since the column was in string and I tried my best to subtract a year from the date in spark but couldn't find any methods for this – whatsinthename Oct 24 '21 at 13:40
2

According to Python datetime.strptime

# Open Group specification for strptime() states that a %y
#value in the range of [00, 68] is in the century 2000, while
#[69,99] is in the century 1900
if year <= 68:
    year += 2000
else:
    year += 1900

It's easy enough to implement this with PySpark's when and otherwise

from pyspark.sql import functions as F

(df
    .withColumn('y', F.substring('col', 0, 2).cast('int'))
    .withColumn('y', F
        .when(F.col('y') <= 68, F.col('y') + 2000)
        .otherwise(F.col('y') + 1900)
    )
    .withColumn('t_date', F.concat('y', F.regexp_replace('col', '(\d{2})(\d{2})(\d{2})', '-$2-$3')))
    .show()
)

# Output
# +---+------+----+----------+
# | id|   col|   y|    t_date|
# +---+------+----+----------+
# |  1|210927|2021|2021-09-27|
# |  2|910927|1991|1991-09-27|
# +---+------+----+----------+

Technically, you can argue all day about this approach (0-68 then 69-99). But it's kind of "standard" here, so I don't see anything wrong with using it here.

pltc
  • 5,836
  • 1
  • 13
  • 31
0

Here is another way:

(df.assign(t_date1 = pd.to_datetime('20' + df['Col'].astype(str)
                                    ,format = '%Y/%m/%d').dt.strftime('%d-%m-%Y')))
rhug123
  • 7,893
  • 1
  • 9
  • 24