2

This question concerns processing a large dataset of observations in time. Later-stage work requires a common time step between observations, but in practice, the raw data often misses timesteps. Given a time step (say 1 second), the objective of this question is, within the full range observed in the raw data, to add rows corresponding to any missing timesteps using Pyspark.

I've achieved this by:

  1. Generating a new sequence of time values using the min and max observed times and the assumed common time step in Python
  2. Creating a new Spark dataframe from this sequence, and joining this onto the raw data.

My question is whether there is a more efficient or natural way to solve this problem in Pyspark (or if not whether there are any obvious improvements to my approach)?

I'm specifically interested in whether this can be solved efficiently in Pyspark as opposed to in Spark with code in Java as in this question.

I've detailed my solution, as well as setup and creation of reproducible test data below.

My solution

spark = SparkSession \
.builder \
.appName("Spark StackOverflow Test") \
.getOrCreate()

df = spark.read\
.options(header=True, inferSchema=True)\
.csv('test_data.csv')

# find min and max observed times after timesteps have been subsampled
df.createOrReplaceTempView('test_view')
tmin = spark.sql('select min(date) from test_view').collect()[0]['min(date)']
tmax = spark.sql('select max(date) from test_view').collect()[0]['max(date)']

# create full second-by-second index
new_date_index = takewhile(lambda x: x <= tmax,
        date_seq_generator(tmin, datetime.timedelta(seconds=1)))

# create Spark dataframe for new time index
index_schema = StructType([StructField("date", StringType())])
time_rdd = sc.parallelize([datetime.datetime.strftime(t, '%Y-%m-%d %H:%M:%S')
                       for t in new_date_index])
df_dates = spark.createDataFrame(time_rdd.map(lambda s: s.split(',')),
                                 schema=index_schema)
# cast new index type from string to timestamp
df_dates = df_dates.withColumn("date", df_dates["date"].cast(TimestampType()))

# join the spark dataframes to reindex
reindexed = df_dates.join(df,
                      how='left',
                      on= df_dates.date == df.date).select([df_dates.date, df.foo])

Setup and creation of dummy reproducible data

Basic form:

                  date       foo
0  2018-01-01 00:00:00  0.548814
1  2018-01-01 00:00:01  0.715189
2  2018-01-01 00:00:02  0.602763
3  2018-01-01 00:00:03  0.544883
4  2018-01-01 00:00:04  0.423655
5  2018-01-01 00:00:05  0.645894
6  2018-01-01 00:00:08  0.963663
...

Code:

import datetime
import pandas as pd
import numpy as np
from itertools import takewhile
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.sql.functions import col

# set seed for data
np.random.seed(0)

def date_seq_generator(start, delta):
    """
    Generator function for time observations.

    :param start: datetime start time
    :param delta: timedelta between observations
    :returns: next time observation
    """
    current = start - delta
    while True:
        current += delta
        yield current

def to_datetime(datestring):
    """Convert datestring to correctly-formatted datetime object."""
    return datetime.datetime.strptime(datestring, '%Y-%m-%d %H:%M:%S')

# choose an arbitrary time period
start_time = to_datetime('2018-01-01 00:00:00')
end_time = to_datetime('2018-01-02 00:00:00')

# create the full time index between the start and end times
initial_times = list(takewhile(lambda x: x <= end_time,
            date_seq_generator(start_time, datetime.timedelta(seconds=1))))

# create dummy dataframe in Pandas
pd_df = pd.DataFrame({'date': initial_times,
                      'foo': np.random.uniform(size =len(initial_times))})

# emulate missing time indices
pd_df = pd_df.sample(frac=.7)

# save test data
pd_df.to_csv('test_data.csv', index=False)
twolffpiggott
  • 1,063
  • 8
  • 13
  • 1
    Possible duplicate of [Filling gaps in timeseries Spark](https://stackoverflow.com/questions/42411184/filling-gaps-in-timeseries-spark) – Alper t. Turker May 30 '18 at 15:58

1 Answers1

0

Complete date on Spark with Scala:

    import org.joda.time._
    import org.joda.time.format._
    import org.joda.time.format.DateTimeFormat
    import org.joda.time.DateTime
    import org.joda.time.Days
    import org.joda.time.Duration
    import org.apache.spark.sql.functions._
    import org.joda.time.LocalDate

      def dateComplete(dataFrameDate0: DataFrame, colName: String): DataFrame ={  
    def dayIterator(start: LocalDate, end: LocalDate) = Iterator.iterate(start)(_ plusDays 1) takeWhile (_ isBefore end)

    def dateSeries( date1 : String,date2 : String) : Array[String]= {
    val fromDate = new LocalDate(date1)
    val toDate = new LocalDate(date2)
    val series = dayIterator(fromDate,toDate).toArray
    val arr = series.map(a => a.toString())
    arr
    }
    val rangos = dataFrameDate0.agg(min($"invoicedate").as("minima_fecha"),         
    max($"invoicedate").as("maxima_fecha") )
    val serie_date = spark.sparkContext.parallelize(dateSeries( 
    rangos.select("minima_fecha", "maxima_fecha").take(1)(0)(0).toString, 
    rangos.select("minima_fecha", "maxima_fecha").take(1)(0)(1).toString )).toDF(colName)
    serie_date.join(dataFrameDate0, Seq(colName), "left")
    }

    val pivoteada=dateComplete(prod_group_day,"invoicedate").groupBy("key_product").pivot("invoicedate").agg(sum("cantidad_prod").as("cantidad"))
Mat.cort
  • 37
  • 6