1

I would like to create a pyspark dataframe composed of a list of datetimes with a specific frequency.

Currently I'm using this approach, which seems quite cumbersome and I'm pretty sure there are better ways

# Define date range
START_DATE = dt.datetime(2019,8,15,20,30,0)
END_DATE = dt.datetime(2019,8,16,15,43,0)

# Generate date range with pandas
timerange = pd.date_range(start=START_DATE, end=END_DATE, freq='15min')
# Convert to timestamp
timestamps = [int(x) for x in timerange.values.astype(np.int64) // 10 ** 9]

# Create pyspark dataframe from the above timestamps
(spark.createDataFrame(dates, IntegerType())
    .withColumn('value_date', sf.from_unixtime('value'))
    .drop('value')
    .withColumnRenamed('value_date', 'date').show())

which otputs

+-------------------+
|               date|
+-------------------+
|2019-08-15 20:30:00|
|2019-08-15 20:45:00|
|2019-08-15 21:00:00|
|2019-08-15 21:15:00|
|2019-08-15 21:30:00|
|2019-08-15 21:45:00|
|2019-08-15 22:00:00|
|2019-08-15 22:15:00|
|2019-08-15 22:30:00|
|2019-08-15 22:45:00|
|2019-08-15 23:00:00|
|2019-08-15 23:15:00|
|2019-08-15 23:30:00|
|2019-08-15 23:45:00|
|2019-08-16 00:00:00|
|2019-08-16 00:15:00|
|2019-08-16 00:30:00|
|2019-08-16 00:45:00|
|2019-08-16 01:00:00|
|2019-08-16 01:15:00|
+-------------------+

Can you suggest a smarter way to achieve this?

Thanks

Edit:

This seems to work

(spark.sql('SELECT sequence({start_date}, {end_date}, 60*15) as timestamp_seq'.format(
    start_date=int(START_DATE.timestamp()), end_date=int(END_DATE.timestamp())
)).withColumn('timestamp', sf.explode('timestamp_seq'))
.select(sf.col('timestamp').cast('timestamp').alias('datetime'))).show()

but I'm unable to make it work without the conversion to timestamp.

crash
  • 4,152
  • 6
  • 33
  • 54
  • 1
    By smarter you mean a solution which utilizes the cluster ressources? Have a look at this [solution](https://stackoverflow.com/a/55199718/6664872) or if you are using spark 2.4 at this [solution](https://stackoverflow.com/a/57291174/6664872). – cronoik Aug 17 '19 at 22:07
  • Thanks @cronoik that worked somehow, see the updated question. However I didn't manage to make it with the regular sintax using datetimes without conversion to timestamp. Any suggestion on how to achieve this? – crash Aug 18 '19 at 06:32

1 Answers1

1

Here's a solution working on spark 2.4.3 and python 3.6.8

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.3
      /_/

Using Python version 3.6.8 (default, Dec 30 2018 18:50:55)
SparkSession available as 'spark'.
>>> from pyspark.sql import functions as F
>>> def generate_dates(spark,range_list,interval=60*60*24,dt_col="date_time_ref"): # TODO: attention to sparkSession
...     """
...     Create a Spark DataFrame with a single column named dt_col and a range of date within a specified interval (start and stop included).
...     With hourly data, dates end at 23 of stop day
...
...     :param spark: SparkSession or sqlContext depending on environment (server vs local)
...     :param range_list: array of strings formatted as "2018-01-20" or "2018-01-20 00:00:00"
...     :param interval: number of seconds (frequency), output from get_freq()
...     :param dt_col: string with date column name. Date column must be TimestampType
...
...     :returns: df from range
...     """
...     start,stop = range_list
...     temp_df = spark.createDataFrame([(start, stop)], ("start", "stop"))
...     temp_df = temp_df.select([F.col(c).cast("timestamp") for c in ("start", "stop")])
...     temp_df = temp_df.withColumn("stop",F.date_add("stop",1).cast("timestamp"))
...     temp_df = temp_df.select([F.col(c).cast("long") for c in ("start", "stop")])
...     start, stop = temp_df.first()
...     return spark.range(start,stop,interval).select(F.col("id").cast("timestamp").alias(dt_col))
...
>>> date_range = ["2018-01-20 00:00:00","2018-01-23 00:00:00"]
>>> generate_dates(spark,date_range)
DataFrame[date_time_ref: timestamp]
>>> generate_dates(spark,date_range).show()
+-------------------+
|      date_time_ref|
+-------------------+
|2018-01-20 00:00:00|
|2018-01-21 00:00:00|
|2018-01-22 00:00:00|
|2018-01-23 00:00:00|
+-------------------+

Sincerely I think your first approach (pd.date_range -> spark.createDataFrame()) is the best approach, since it lets pandas consider eveything related to DST. Simply don't convert in python timestamp objects to int but convert them to str and then cast column from StringType to TimestampType.

ndricca
  • 490
  • 4
  • 13