2

I am working with time series big data using pyspark, I have data in GB (100 GB or more) number of rows are in million or in billions. I am new to this big data using pyspark. want to resample (down sample) the data original data is in 10 Hz in timestamp in milliseconds i want to convert this data to 1 Hz in seconds. It would be really helpful if you could give me some idea. Also would be great if you could recommend me any documentation/solution i can use to deal with (huge data) big data using spark. Below is sample data. DF=

start_timestamp end_timestamp value
2020-11-05 03:25:02.088 2020-11-05 04:10:19.288 0.0
2020-11-05 04:24:25.288 2020-11-05 04:24:25.218 0.4375
2020-11-05 04:24:25.218 2020-11-05 04:24:25.318 1.0625
2020-11-05 04:24:25.318 2020-11-05 04:24:25.418 1.21875
2020-11-05 04:24:25.418 2020-11-05 04:24:25.518 1.234375
2020-11-05 04:24:25.518 2020-11-05 04:24:25.618 1.265625
2020-11-05 04:24:25.618 2020-11-05 04:24:25.718 1.28125

I tried code which i got on: PySpark: how to resample frequencies

Here is my sample code:

day = 1   #60 * 60 * 24
epoch = (col("start_timestamp").cast("bigint") / day).cast("bigint") * day

with_epoch = distinctDF.withColumn("epoch", epoch)

min_epoch, max_epoch = with_epoch.select(min_("epoch"), max_("epoch")).first()


ref = spark.range(
    min_epoch, max_epoch + 1, day
).toDF("epoch")  
(ref
    .join(with_epoch, "epoch", "left")
    .orderBy("epoch")
    .withColumn("start_timestamp_resampled", timestamp_seconds("epoch"))
    .show(15, False))

Code is working but i am not sure is it correct or not: output looks like below. But is it showing the null in the columns.

epoch start_timestamp end_timestamp value start_timestamp_resampled
1604546702 2020-11-05 03:25:02.088 2020-11-05 04:10:19.288 0.0 2020-11-05 03:25:02
1604546703 null null null 2020-11-05 03:25:03
1604546704 null null null 2020-11-05 03:25:04
1604546705 null null null 2020-11-05 03:25:05
1604546706 null null null 2020-11-05 03:25:06
1604546707 null null null 2020-11-05 03:25:07
SSS
  • 73
  • 11
  • Welcome to [Stack Overflow.](https://stackoverflow.com/ "Stack Overflow")! Questions that ask for general guidance regarding a problem approach are typically too broad and are not a good fit for this site. People have their own method for approaching the problem and because of this there cannot be a correct answer. Give a good read over [Where to Start](https://softwareengineering.meta.stackexchange.com/questions/6366/where-to-start/6367#6367), and [Minimal Reproducible Example](https://stackoverflow.com/help/minimal-reproducible-example "Minimal Reproducible Example") then edit your post. – itprorh66 Nov 17 '21 at 15:58
  • A frequency of 10Hz is not a millisecond period, so your question title is already self-contradicting. – Ulrich Eckhardt Nov 18 '21 at 12:56
  • @UlrichEckhardt my timestamp looks like this "2020-11-05 03:25:02.088" so .088 is in milliseconds, And all data is recorded at 10 Hz. so want to downsample it to 1Hz (i.e. millisecond to second). – SSS Nov 22 '21 at 10:35
  • Hold on: There is the unit (e.g. seconds in 1.52s), the resolution (e.g. 1/50th second or 0.02s) and the sample rate (e.g. 20Hz frequency or 0.05s period). Note that your timestamps don't have a unit but are in a mixed form of different units (not sure about the proper term). Then, you can downsample but still keep the resolution. You can also convert the unit, e.g. from that mixed form seconds or minutes. Also, of course, you can discard precision and round to a smaller resolution. What exactly do you want? Maybe, for a set of example inputs, you could provide the expected outputs. – Ulrich Eckhardt Nov 22 '21 at 12:40

1 Answers1

2

When downsampling you have to think about how you want to handle the data you're loosing.

Using a join, you will only get data when timestamps matches. But you could also decide to aggregate the data point using: mean, max, min, sum...

The way I would do it:

import pyspark.sql.functions as F
df = df.withColumn("Timestamp_resampled", F.date_trunc(timestamp, format='yyyy-MM-dd HH:mm:ss'))
df = df.groupby("Timestamp_resampled").agg(<function of your choice>)

Then once resampled, if you have missing timestamps you could use your method with a join and the epoch_range to populate the missing timestamp and make sure you have one for every second.

Yoan B. M.Sc
  • 1,485
  • 5
  • 18
  • Thanks, as my main DF column "start_timestamp" is in milliseconds(format='yyyy-MM-dd HH:mm:ss.sss). im little confused and got error: "SyntaxError: positional argument follows keyword argument". As per your solution this code should do the downsample, right ? ''' DF1 = DF.withColumn("start_timestamp", F.date_trunc(format='yyyy-MM-dd HH:mm:ss', timestamp)) df = DF1.groupby("start_timestamp").agg(mean) ''' – SSS Nov 22 '21 at 10:47
  • @SSS, try passing the 'timestamp' argument fist then the 'format' this is what the error means. – Yoan B. M.Sc Nov 22 '21 at 13:52
  • @SSS, I've updated the answer, feel free to accept it if that works ! – Yoan B. M.Sc Dec 07 '21 at 13:39