0

I am working on one CSV file below using PySpark(on databricks), but I am not sure how to get the total scan (event name) duration time. Assume one scan per time.

timestamp event value
1 2020-11-17_19:15:33.438102 scan start
2 2020-11-17_19:18:33.433002 scan end
3 2020-11-17_20:05:21.538125 scan start
4 2020-11-17_20:13:08.528102 scan end
5 2020-11-17_21:23:19.635104 pending start
6 2020-11-17_21:33:26.572123 pending end
7 2020-11-17_22:05:29.738105 pending start
.........

Below are some of my thoughts:

first get scan start time

scan_start = df[(df['event'] == 'scan') & (df['value'] == 'start')]

scan_start_time = scan_start['timestamp']

get scan end time

scan_end = df[(df['event'] == 'scan') & (df['value'] == 'end')]

scan_end_time = scan_start['timestamp']

the duration of each scan

each_duration = scan_end_time.values - scan_start_time.values

total duration

total_duration_ns = each_duration.sum()

But, I am not sure how to do the calculation in PySpark.

First, do we need to create a schema to pre-define the column name 'timestamp' type in timestamp? (Assume all the column types (timestamp, event, value) are in str type)

On the other hand, assume we have multiple(1000+files) similar to the above CSV files stored in databricks, how can we create a reusable code for all the CSV files. Eventually, create one table to store info of the total scan_duration.

Can someone please share with me some code in PySpark? Thank you so much

Alex Ott
  • 80,552
  • 8
  • 87
  • 132
Lawrence
  • 11
  • 4
  • Do events have an id to identify them? Are scans run sequentially, that means a new scan can only start after the previous one has ended? Does the dataframe come already sorted by timestamp? How do you want to deal with "pending" events? – user2314737 Feb 09 '22 at 19:33

1 Answers1

0

This code will compute for each row the difference between the current timestamp and the timestamp in the previous row.

I'm creating a dataframe for reproducibility.

from pyspark.sql import SparkSession, Window
from pyspark.sql.types import *
from pyspark.sql.functions import regexp_replace, col, lag
import pandas as pd

spark = SparkSession.builder.appName("DataFarme").getOrCreate()
data = pd.DataFrame(
    {
        "timestamp": ["2020-11-17_19:15:33.438102","2020-11-17_19:18:33.433002","2020-11-17_20:05:21.538125","2020-11-17_20:13:08.528102"],
        "event": ["scan","scan","scan","scan"],
        "value": ["start","end","start","end"]
    }
)
df=spark.createDataFrame(data)
df.show()
# +--------------------+-----+-----+                                              
# |           timestamp|event|value|
# +--------------------+-----+-----+
# |2020-11-17_19:15:...| scan|start|
# |2020-11-17_19:18:...| scan|  end|
# |2020-11-17_20:05:...| scan|start|
# |2020-11-17_20:13:...| scan|  end|
# +--------------------+-----+-----+

Convert "timestamp" column to TimestampType() to be able to compute differences:

df=df.withColumn("timestamp",
regexp_replace(col("timestamp"),"_"," "))
df.show(truncate=False)
# +——————————-------------———+---——+—---—+
# |timestamp                 |event|value|
# +————————————-------------—+---——+—---—+
# |2020-11-17 19:15:33.438102|scan |start|
# |2020-11-17 19:18:33.433002|scan |end  |
# |2020-11-17 20:05:21.538125|scan |start|
# |2020-11-17 20:13:08.528102|scan |end  |
# +——————————-------------———+---——+---——+

df = df.withColumn("timestamp",
regexp_replace(col("timestamp"),"_"," ").cast(TimestampType()))
df.dtypes
#  [('timestamp', 'timestamp'), ('event', 'string'), ('value', 'string')]

Use pyspark.sql.functions.lag function that returns the value of the previous row (offset=1 by default). See also How to calculate the difference between rows in PySpark? or Applying a Window function to calculate differences in pySpark

df.withColumn("lag_previous", col("timestamp").cast("long") - lag('timestamp').over(
    Window.orderBy('timestamp')).cast("long")).show(truncate=False)
# WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

Using Window without partition gives a warning.
It is better to partition the dataframe for the Window operation, I partitioned here by type of event:

df.withColumn("lag_previous", col("timestamp").cast("long") - lag('timestamp').over(
    Window.partitionBy("event").orderBy('timestamp')).cast("long")).show(truncate=False)
# +———————————-------------——+---——+—---—+—------—————+
# |timestamp                 |event|value|lag_previous|
# +———————————-------------——+---——+---——+------——————+
# |2020-11-17 19:15:33.438102|scan |start|null        |
# |2020-11-17 19:18:33.433002|scan |end  |180         |
# |2020-11-17 20:05:21.538125|scan |start|2808        |
# |2020-11-17 20:13:08.528102|scan |end  |467         |
# +—————-------------————————+---——+—---—+—------—————+

From this table you can filter out the rows with "end" value to get the total durations.

user2314737
  • 27,088
  • 20
  • 102
  • 114
  • Thank you for your explanations. However, what about if I have multiple files that need to be tested out. Are there any other special steps that I need to do? – Lawrence Feb 09 '22 at 23:16
  • @Lawrence you can load them all in a dataframe if they have the same format, just give the name of the input folder in place of the input file – user2314737 Feb 10 '22 at 10:19
  • thank you for all the suggestions!! – Lawrence Feb 10 '22 at 17:35