6

Imagine a large dataset (>40GB parquet file) containing value observations of thousands of variables as triples (variable, timestamp, value).

Now think of a query in which you are just interested in a subset of 500 variables. And you want to retrieve the observations (values --> time series) for those variables for specific points in time (observation window or timeframe). Such having a start and end time.

Without distributed computing (Spark), you could code it like this:

for var_ in variables_of_interest:
    for incident in incidents:

        var_df = df_all.filter(
            (df.Variable == var_)
            & (df.Time > incident.startTime)
            & (df.Time < incident.endTime))

My question is: how to do that with Spark/PySpark? I was thinking of either:

  1. joining the incidents somehow with the variables and filter the dataframe afterward.
  2. broadcasting the incident dataframe and use it within a map-function when filtering the variable observations (df_all).
  3. use RDD.cartasian or RDD.mapParitions somehow (remark: the parquet file was saved partioned by variable).

The expected output should be:

incident1 --> dataframe 1
incident2 --> dataframe 2
...

Where dataframe 1 contains all variables and their observed values within the timeframe of incident 1 and dataframe 2 those values within the timeframe of incident 2.

I hope you got the idea.

UPDATE

I tried to code a solution based on idea #1 and the code from the answer given by zero323. Work's quite well, but I wonder how to aggregate/group it to the incident in the final step? I tried adding a sequential number to each incident, but then I got errors in the last step. Would be cool if you can review and/or complete the code. Therefore I uploaded sample data and the scripts. The environment is Spark 1.4 (PySpark):

Matthias
  • 5,574
  • 8
  • 61
  • 121
  • Not so much of a large dataset, it's not even a tera. A big one though! :) What have you tried so far..? – gsamaras Aug 25 '16 at 22:58
  • I read tons of posts and examples just to get an idea how I could solve it. But haven't implemented anything yet. But my first shot would be filtering the variables with the isin-function to df2, then broadcasting the incident dataframe and use map on df2. But not sure how to get (yield) to those dataframes (observations) for each incident. Somehow stuck. – Matthias Aug 25 '16 at 23:01
  • `join` looks like a sensible starting pont. You have enough to avoid Cartesian product and with 500 records this can be easily optimized to broadcast join. – zero323 Aug 26 '16 at 00:10
  • Any example? I will try to code one today. – Matthias Aug 26 '16 at 10:46
  • I added sample code and script above. Please review. – Matthias Aug 28 '16 at 14:03

1 Answers1

3

Generally speaking only the first approach looks sensible to me. Exact joining strategy on the number of records and distribution but you can either create a top level data frame:

ref = sc.parallelize([(var_, incident) 
    for var_ in variables_of_interest:
    for incident in incidents
]).toDF(["var_", "incident"])

and simply join

same_var = col("Variable") == col("var_")
same_time = col("Time").between(
    col("incident.startTime"),
    col("incident.endTime")
)

ref.join(df.alias("df"), same_var &  same_time)

or perform joins against particular partitions:

incidents_ = sc.parallelize([
   (incident, ) for incident in incidents
]).toDF(["incident"])

for var_ in variables_of_interest:
    df = spark.read.parquet("/some/path/Variable={0}".format(var_))
    df.join(incidents_, same_time)

optionally marking one side as small enough to be broadcasted.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Hmm, thanks for the example. will need some time to understand it. could you contact me on Skype? just to discuss some details? nagilo12345 – Matthias Aug 26 '16 at 13:36
  • Sorry @Matthias, I don't have an account anymore. – zero323 Aug 26 '16 at 21:09
  • Hi Zero. I tried using your code in a script and works fine so far. The only thing that I don't get is how to add a number to each incident that I can use after the final join step to select the data from the resulting frame by incidents number. You can find the scripts in the updated question above. Please review, thanks! – Matthias Aug 28 '16 at 14:04
  • Sorry, I am not much around here lately. I'll try to take a look when I have a spare moment. – zero323 Sep 01 '16 at 19:50
  • Yeah, stackoverlow is not the fastest way to communicate ;) thanks for your support anyway. – Matthias Sep 07 '16 at 14:52