0

Let's say I have the following dataframe with a non-standard timestamp column without datetime format. Due to I need to include a new column and convert it into an 24hourly-based timestamp for time-series visualizing matter by:

df['timestamp(24hrs)'] = round(df['timestamp(sec)']/24*3600)

and get this:

+----------------+----+-----+
|timestamp(24hrs)|User|count|
+----------------+----+-----+
|0.0             |U100|435  |
|1.0             |U100|1091 |
|2.0             |U100|992  |
|3.0             |U100|980  |
|4.0             |U100|288  |
|8.0             |U100|260  |
|9.0             |U100|879  |
|10.0            |U100|875  |
|11.0            |U100|911  |
|13.0            |U100|628  |
|14.0            |U100|642  |
|16.0            |U100|631  |
|17.0            |U100|233  |
 ...               ...  ...
|267.0           |U100|1056 |
|269.0           |U100|878  |
|270.0           |U100|256  |
+----------------+----+-----+

Now I noticed that some records' timestamps are missing, and I need to impute those missing data:

  • timestamp(24hrs) in continuous order
  • count value by 0

Expected output:

+----------------+----+-----+
|timestamp(24hrs)|User|count|
+----------------+----+-----+
|0.0             |U100|435  |
|1.0             |U100|1091 |
|2.0             |U100|992  |
|3.0             |U100|980  |
|4.0             |U100|288  |
|5.0             |U100|0    |
|6.0             |U100|0    |
|7.0             |U100|0    |
|8.0             |U100|260  |
|9.0             |U100|879  |
|10.0            |U100|875  |
|11.0            |U100|911  |
|12.0            |U100|0    |
|13.0            |U100|628  |
|14.0            |U100|642  |
|15.0            |U100|0    |
|16.0            |U100|631  |
|17.0            |U100|233  |
 ...               ...  ...
|267.0           |U100|1056 |
|268.0           |U100|0    |
|269.0           |U100|878  |
|270.0           |U100|256  |
+----------------+----+-----+

Any idea how can I do this? Based on this answer over standard timestamp, I can imagine I need to create a new column timestamp(24hrs) from the start and end of the previous one and do left join() & crossJoin() but I couldn't manage it yet.

I've tried the following unsuccessfully:

import pyspark.sql.functions as F

all_dates_df = df.selectExpr(
    "sequence(min(timestamp(24hrs)), max(timestamp(24hrs)), interval 1 hour) as hour"
).select(F.explode("timestamp(24hrs)").alias("timestamp(24hrs)"))

all_dates_df.show()

result_df = all_dates_df.crossJoin(
    df.select("UserName").distinct()
).join(
    df, 
    ["count", "timestamp(24hrs)"],
    "left"
).fillna(0)

result_df.show()
Mario
  • 1,631
  • 2
  • 21
  • 51

1 Answers1

0

sequence function is available for integer. It doesn't work for double type so it requires to cast to integer then cast back to double (if you want to retain as double).

df_seq = (df.withColumn('time_int', F.col('timestamp(24hrs)').cast(IntegerType()))
          .select(F.explode(F.sequence(F.min('time_int'), F.max('time_int'))).alias('timestamp(24hrs)'))
          .select(F.col('timestamp(24hrs)').cast(DoubleType())))

df = (df_seq.crossJoin(df.select("User").distinct())
      .join(df, on=['User', 'timestamp(24hrs)'], how='left')
      .fillna(0))
Emma
  • 8,518
  • 1
  • 18
  • 35
  • Thanks for your input. I have 2 questions: 1- what if I just have `timestamp(24hrs)` and `count` columns (without `User` column) then how would the proposed solution look like? (here, they so call it the [look-up](https://stackoverflow.com/a/74490625/10452700) process) 2- Is it possible to directly use a fixed integer number for an endpoint like `.select(F.explode(F.sequence(F.min('time_int'), 270)))`? – Mario Feb 03 '23 at 03:01
  • 1
    1. you can skip the `crossJoin` 2. Yes but you need to wrap with `F.lit` for the constant value. – Emma Feb 03 '23 at 15:14
  • About Q2, I used `.select(F.explode(F.sequence(F.lit(0), F.lit(273))).alias('timestamp(24hrs)'))` but it strangely appends the dataset several times!! Maybe due to omitting `crossJoin()` but I include `.dropDuplicates(['timestamp(24hrs)'])` and `.orderBy('timestamp(24hrs)', ascending=True))` to ensure that we remove duplications and fix the order for visualizations issues. – Mario Feb 04 '23 at 03:08
  • 1
    Duplicates probably comes from your original data(`sequence` won't generate duplicates), and in that case, it is usually better to `dropDuplicates` before `join`. – Emma Feb 06 '23 at 14:27