1

I am having a data set in the following format:

import numpy as np
import pandas as pd

# Create the data set

np.random.seed(42)

records = list()
for i in range(2):
    for j in range(2):        
        for k in range(500):
            t = np.random.randint(pd.Timestamp('2000-01-01').value, pd.Timestamp('2018-01-01').value)
            if np.random.rand() > .95: continue                
            ts = pd.Timestamp(t).strftime('%Y-%m-%d %H:%M:%S.%f')
            records.append( (i, j, np.random.rand(), ts) )

df = pd.DataFrame.from_records(records)
df.columns =['a_id', 'b_id', 'value', 'time']

Which looks like this:

      a_id  b_id     value                        time
0        0     0  0.156019  2007-09-28 15:12:24.260596
1        0     0  0.601115  2015-09-08 01:59:18.043399
2        0     0  0.969910  2012-01-10 07:51:29.662492
3        0     0  0.181825  2011-08-28 19:58:33.281289
4        0     0  0.524756  2015-11-15 14:18:17.398715
5        0     0  0.611853  2015-01-07 23:44:37.034322
6        0     0  0.366362  2008-06-21 11:56:10.529679
7        0     0  0.199674  2010-11-08 18:24:18.794838
8        0     0  0.046450  2008-04-27 02:36:46.026876

Here a_id and b_id are the key for a sensor. This means the data frame has to be transformed as such:

df_ = pd.pivot_table(df, index='time', columns=['a_id', 'b_id'], values='value')
df_.index = [pd.to_datetime(v) for v in df_.index]
df_ = df_.resample('1W').mean().ffill().bfill()

After resampling and filling the gaps, the data is in the desired format:

a_id               0                   1          
b_id               0         1         0         1
2000-01-09  0.565028  0.560434  0.920740  0.458825
2000-01-16  0.565028  0.146963  0.920740  0.217588
2000-01-23  0.565028  0.840872  0.920740  0.209690
2000-01-30  0.565028  0.046852  0.920740  0.209690
2000-02-06  0.565028  0.046852  0.704871  0.209690

Each column contains now the data of a sensor.

The problem is, I do not know how to do this in PySpark.

df_test = spark.createDataFrame(df) \
    .withColumn('time', F.to_utc_timestamp('time', '%Y-%m-%d %H:%M:%S.%f'))
df_test.printSchema()

Having

root
 |-- a_id: long (nullable = true)
 |-- b_id: long (nullable = true)
 |-- value: double (nullable = true)
 |-- time: timestamp (nullable = true)

How can I transform df_test such that it has the same form as df_?

zero323
  • 322,348
  • 103
  • 959
  • 935
Stefan Falk
  • 23,898
  • 50
  • 191
  • 378
  • you want to group by date (not timestamp) and pivot on a_id and b_id ? – eliasah Mar 27 '18 at 09:29
  • @eliasah Well, the group by should be flexible - here I resample on weeks but I might have to increase the resolution to days or hours. And yes, I pivot on `a_id` and `b_id`. – Stefan Falk Mar 27 '18 at 09:38
  • from the top of my head, i'd say concat a_id and b_id under a new column (c_id) and group by date pivot on c_id and use values how you see fit – eliasah Mar 27 '18 at 09:44
  • @eliasah Yeah, that's working. Only part that's missing would be the resampling - idk I guess that might be another question. The thing is I need to align those time series somehow. – Stefan Falk Mar 27 '18 at 09:58

2 Answers2

1

As mentionned in the comment, here is a solution to pivot your data :

You should concat your columns a_id and b_id under a new column c_id and group by date then pivot on c_id and use values how to see fit.

As for resampling, I'd point you to the solution provided by @zero323 here.

eliasah
  • 39,588
  • 11
  • 124
  • 154
0

You can "resample" your data using pyspark.ml.feature.Bucketizer

# Truncate timestamp to day precision, and convert to unixtime
df = df.withColumn("tt",
                   F.unix_timestamp(F.date_trunc("day", "time")))
df.show(5)

# +----+----+--------------------+--------------------+---+----------+
# |a_id|b_id|               value|                time| id|        tt|
# +----+----+--------------------+--------------------+---+----------+
# |   0|   0| 0.15601864044243652|2007-09-28 15:12:...| 00|1190962800|
# |   0|   0|  0.6011150117432088|2015-09-08 01:59:...| 00|1441695600|
# |   0|   0|  0.9699098521619943|2012-01-10 07:51:...| 00|1326182400|
# |   0|   0| 0.18182496720710062|2011-08-28 19:58:...| 00|1314514800|
# |   0|   0|  0.5247564316322378|2015-11-15 14:18:...| 00|1447574400|
# +----+----+--------------------+--------------------+---+----------+

# Get the minimum and maximum dates
tmin = df.select(F.min("tt")).collect()[0][0]
tmax = df.select(F.max("tt")).collect()[0][0]

# Get the number of seconds in a week
week = 60 * 60 * 24 * 7

# Get a list of bucket splits (add infinity for last split if weeks don't evenly divide)
splits = list(range(tmin, tmax, week)) + [float("inf")]

# Create bucket and bucket your data 
bucketizer = Bucketizer(inputCol="tt", outputCol="num_week", splits=splits)
bucketed_df = Bucketizer.transform(df)
bucketed_df.show(5)

# +----+----+-------------------+--------------------+---+----------+---------+
# |a_id|b_id|              value|                time| id|        tt|num_weeks|
# +----+----+-------------------+--------------------+---+----------+---------+
# |   0|   0|0.15601864044243652|2007-09-28 15:12:...| 00|1190962800|403.0    |
# |   0|   0| 0.6011150117432088|2015-09-08 01:59:...| 00|1441695600|818.0    |
# |   0|   0| 0.9699098521619943|2012-01-10 07:51:...| 00|1326182400|627.0    |
# |   0|   0|0.18182496720710062|2011-08-28 19:58:...| 00|1314514800|607.0    |
# |   0|   0| 0.5247564316322378|2015-11-15 14:18:...| 00|1447574400|827.0    |
# +----+----+-------------------+--------------------+---+----------+---------+

# Convert the buckets to a timestamp (seconds in week * bucket value + min_date)
bucketed_df = bucketed_df.withColumn(
    "time",
    F.from_unixtime(col("weeks") * week + tmin).cast("date")))
bucketed_df.show(5)

# +----+----+-------------------+----------+---+----------+-----+
# |a_id|b_id|              value|      time| id|        tt|weeks|
# +----+----+-------------------+----------+---+----------+-----+
# |   0|   0|0.15601864044243652|2007-09-24| 00|1190962800|403.0|
# |   0|   0| 0.6011150117432088|2015-09-07| 00|1441695600|818.0|
# |   0|   0| 0.9699098521619943|2012-01-09| 00|1326182400|627.0|
# |   0|   0|0.18182496720710062|2011-08-22| 00|1314514800|607.0|
# |   0|   0| 0.5247564316322378|2015-11-09| 00|1447574400|827.0|
# +----+----+-------------------+----------+---+----------+-----+

# Finally, do the groupBy and pivot as already explained
# (I already concatenated "a_id" and "b_id" into the column "id"
final_df = bucketed_df.groupBy("time").pivot("id").agg(F.avg("value"))
final_df.show(10)


#    +----------+--------------------+--------------------+-------------------+-------------------+
#    |      time|                  00|                  01|                 10|                 11|
#    +----------+--------------------+--------------------+-------------------+-------------------+
#    |2015-03-09|0.045227288910538066|  0.8633336495718252| 0.8229838050417675|               null|
#    |2000-07-03|                null|                null| 0.7855315583735368|               null|
#    |2013-09-09|  0.6334037565104235|                null|0.14284196481433187|               null|
#    |2005-06-06|                null|  0.9095933818175037|               null|               null|
#    |2017-09-11|                null|  0.9684887775943838|               null|               null|
#    |2004-02-23|                null|  0.3782888656818202|               null|0.26674411859262276|
#    |2004-07-12|                null|                null| 0.2528581182501112| 0.4189697737795244|
#    |2000-12-25|                null|                null| 0.5473347601436167|               null|
#    |2016-04-25|                null|  0.9918099513493635|               null|               null|
#    |2016-10-03|                null|0.057844449447160606| 0.2770125243259788|               null|
#    +----------+--------------------+--------------------+-------------------+-------------------+

That will get you to almost what you need. Unfortunately implementing the pandas.DataFrame.ffill() and pandas.DataFrame.bfill() methods is not nearly as easy in pyspark as it is in pandas due to distributed nature of the data. See here and here for suggestions.

kshell
  • 236
  • 1
  • 6