0

I have a dataset like this:

timestamp     vars 
2             [1,2]
2             [1,2]
3             [1,2,3]
3             [1,2]

And I want a dataframe like this. Basically each value in the above dataframe is an index and the frequency of that value is the value at that index. This computation is done over every unique timestamp.

timestamp     vars 
2             [0, 2, 2]
3             [0,2,2,1]

Right now, I'm grouping by timestamp, and aggregrating/flattening vars (to get something like (1,2,1,2 for timestamp 2 or 1,2,3,1,2 for timestamp 3) and then I have a udf that uses collections.Counter to to get a key->value dict. I then turn this dict into the format I want.

The groupBy/agg can get arbitrarily large (arrays size can be in the millions) and this seems like a good usecase for the Window function, but I'm not sure how to put it all together.

Thinks it's also worth mentioning that I've tried repartioning, and converting to an RDD and using groupByKey. Both are arbitrarily slow (>24 hours) on large datasets.

tanyabrown
  • 29
  • 8
  • for index 2 how does it go from 1,2,1,2 to [0,2,2]? windows with partitionby clause should out perform groupby, and performance could be better if you didnt use udf and instead used spark inbuilt funcs to achieve ur goal – murtihash Mar 04 '20 at 01:28
  • There are 2 1s and 2 2s in [1,2,1,2]. So at index 1, I put 2 (the frequency) and at index 2, I put 2. Since there's no 0, index 0 is still so. Hence, [0,2,2]. I'm not sure how to get from [1,2] and [1,2] to [1,2,1,2] using partitionBy and window. Tried but it only works for sums. – tanyabrown Mar 04 '20 at 02:18

1 Answers1

2

Edit: As discussed in comments, the issue for the original methods could be from count using the filter or aggregate functions which triggers unnecessary data scans. Below we explode the arrays and do the aggregation(count) before creating the final array column:

from pyspark.sql.functions import collect_list, struct  

df = spark.createDataFrame([(2,[1,2]), (2,[1,2]), (3,[1,2,3]), (3,[1,2])],['timestamp', 'vars'])

df.selectExpr("timestamp", "explode(vars) as var") \
    .groupby('timestamp','var') \
    .count() \
    .groupby("timestamp") \
    .agg(collect_list(struct("var","count")).alias("data")) \
    .selectExpr(
        "timestamp",
        "transform(data, x -> x.var) as indices",
        "transform(data, x -> x.count) as values"
    ).selectExpr(
        "timestamp",
        "transform(sequence(0, array_max(indices)), i -> IFNULL(values[array_position(indices,i)-1],0)) as new_vars"
    ).show(truncate=False)
+---------+------------+
|timestamp|new_vars    |
+---------+------------+
|3        |[0, 2, 2, 1]|
|2        |[0, 2, 2]   |
+---------+------------+

Where:

(1) we explode the array and do count() for each timestamp + var

(2) groupby timestamp and create an array of structs containing two fields var and count

(3) convert the array of structs into two arrays: indices and values (similar to what we define SparseVector)

(4) transform the sequence sequence(0, array_max(indices)), for each i in the sequence, use array_position to find the index of i in indices array and then retrieve the value from values array at the same position, see below:

IFNULL(values[array_position(indices,i)-1],0)

notice that the function array_position uses 1-based index and array indexing is 0-based, thus we have a -1 in the above expression.

Old methods:

(1) Use transform + filter/size

from pyspark.sql.functions import flatten, collect_list

df.groupby('timestamp').agg(flatten(collect_list('vars')).alias('data')) \
  .selectExpr(
    "timestamp", 
    "transform(sequence(0, array_max(data)), x -> size(filter(data, y -> y = x))) as vars"
  ).show(truncate=False)
+---------+------------+
|timestamp|vars        |
+---------+------------+
|3        |[0, 2, 2, 1]|
|2        |[0, 2, 2]   |
+---------+------------+

(2) Use aggregate function:

df.groupby('timestamp').agg(flatten(collect_list('vars')).alias('data')) \
   .selectExpr("timestamp", """ 

     aggregate(   
       data,         
       /* use an array as zero_value, size = array_max(data))+1 and all values are zero */
       array_repeat(0, int(array_max(data))+1),       
       /* increment the ith value of the Array by 1 if i == y */
       (acc, y) -> transform(acc, (x,i) -> IF(i=y, x+1, x))       
     ) as vars   

""").show(truncate=False)
jxc
  • 13,553
  • 4
  • 16
  • 34
  • this is an eye opener for me on how you can use filter inside a transform. great solution – murtihash Mar 04 '20 at 05:21
  • Thank you for this! It helps, but still blows up at the aggregate because the length of that list can me in the millions. Is there a way to introduce the Window function/partition by timestamp? – tanyabrown Mar 04 '20 at 23:53
  • BTW. what is the max value of array_max(data)? the millions of items in an array you mentioned are before the aggregation or after aggregation? – jxc Mar 05 '20 at 01:32
  • The value of array_max(data) can go up to a million. The transform expression is really the bottleneck on large datasets. Need to try and optimize that. – tanyabrown Mar 05 '20 at 23:39
  • @tanyabrown, I see the issue with the existing methods, if M is number of all items in an array and N is the array_max(data), then the data to scan/compare for each timestamp would be O(M*N), that would be very low efficient for big M and N. It might be better to first explode the array to do the aggregation and then create the array. this could be O(N) for each row. I will check this method back after I get home. BTW. is it possible to create a SparseVector column with a fixed size than an ArrayType column with variable sizes for your task? – jxc Mar 06 '20 at 00:23
  • @jxc Thank you for your help! The output needs to be an ArrayType column, but the size can be fixed. Will give the explode solution a try. – tanyabrown Mar 06 '20 at 18:16
  • @tanyabrown, in such case, you might try CountVectorizer with a user-defined vocabulary (which is range(N+1), where N is the `max(array_max(data))`): http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.CountVectorizerModel.from_vocabulary. set `model = CountVectorizerModel.from_vocabulary([*map(str,range(N+1))], inputCol="data", outputCol="vec_vars")`. this will create a SparseVector. you will then need an udf to convert it into an ArrayType column, see https://stackoverflow.com/questions/38384347/how-to-split-vector-into-columns-using-pyspark. – jxc Mar 07 '20 at 00:37
  • @jxc Didn't realize we can do that. I'll try it out! Thanks. – tanyabrown Mar 09 '20 at 21:01