I am having a data set in the following format:
import numpy as np
import pandas as pd
# Create the data set
np.random.seed(42)
records = list()
for i in range(2):
for j in range(2):
for k in range(500):
t = np.random.randint(pd.Timestamp('2000-01-01').value, pd.Timestamp('2018-01-01').value)
if np.random.rand() > .95: continue
ts = pd.Timestamp(t).strftime('%Y-%m-%d %H:%M:%S.%f')
records.append( (i, j, np.random.rand(), ts) )
df = pd.DataFrame.from_records(records)
df.columns =['a_id', 'b_id', 'value', 'time']
Which looks like this:
a_id b_id value time
0 0 0 0.156019 2007-09-28 15:12:24.260596
1 0 0 0.601115 2015-09-08 01:59:18.043399
2 0 0 0.969910 2012-01-10 07:51:29.662492
3 0 0 0.181825 2011-08-28 19:58:33.281289
4 0 0 0.524756 2015-11-15 14:18:17.398715
5 0 0 0.611853 2015-01-07 23:44:37.034322
6 0 0 0.366362 2008-06-21 11:56:10.529679
7 0 0 0.199674 2010-11-08 18:24:18.794838
8 0 0 0.046450 2008-04-27 02:36:46.026876
Here a_id
and b_id
are the key for a sensor. This means the data frame has to be transformed as such:
df_ = pd.pivot_table(df, index='time', columns=['a_id', 'b_id'], values='value')
df_.index = [pd.to_datetime(v) for v in df_.index]
df_ = df_.resample('1W').mean().ffill().bfill()
After resampling and filling the gaps, the data is in the desired format:
a_id 0 1
b_id 0 1 0 1
2000-01-09 0.565028 0.560434 0.920740 0.458825
2000-01-16 0.565028 0.146963 0.920740 0.217588
2000-01-23 0.565028 0.840872 0.920740 0.209690
2000-01-30 0.565028 0.046852 0.920740 0.209690
2000-02-06 0.565028 0.046852 0.704871 0.209690
Each column contains now the data of a sensor.
The problem is, I do not know how to do this in PySpark.
df_test = spark.createDataFrame(df) \
.withColumn('time', F.to_utc_timestamp('time', '%Y-%m-%d %H:%M:%S.%f'))
df_test.printSchema()
Having
root
|-- a_id: long (nullable = true)
|-- b_id: long (nullable = true)
|-- value: double (nullable = true)
|-- time: timestamp (nullable = true)
How can I transform df_test
such that it has the same form as df_
?