0

I need to create a event_id basically a counter grouping on multiple columns(v_id,d_id,ip,l_id) and incrementing it when delta > 40 to get the output like this

v_id d_id ip l_id delta event_id  last_event_flag
1     20   30 40   1    1           N 
1     20   30 40   2    1           N
1     20   30 40   3    1           N
1     20   30 40   4    1           Y
1     20   20 40   1    1           Y
1     30   30 40   2    1           N
1     30   30 40   3    1           N
1     30   30 40   4    1           N
1     30   30 40   5    1           Y

i was able to achieve this using pandas data frame

df['event_id'] = (df.delta >=40.0).groupby([df.l_id,df.v_id,d_id,ip]).cumsum() + 1
df.append(df['event_id'], ignore_index=True

but seeing memory error when executing it on a larger data .

How to do similar thing in pyspark.

user8617180
  • 267
  • 6
  • 20

2 Answers2

1

In pyspark you can do it using a window function:

First let's create the dataframe. Note that you can also directly load it as a dataframe from a csv:

df = spark.createDataFrame(
    sc.parallelize(
        [[1,20,30,40,1,1],
        [1,20,30,40,2,1],
        [1,20,30,40,3,1],
        [1,20,30,40,4,1],
        [1,20,30,40,45,2],
        [1,20,30,40,1,2],
        [1,30,30,40,2,1],
        [1,30,30,40,3,1],
        [1,30,30,40,4,1],
        [1,30,30,40,5,1]]
    ), 
    ["v_id","d_id","ip","l_id","delta","event_id"]
)

You have an implicit ordering in your table, we need to create a monotonically increasing id so that we don't end up shuffling it around:

import pyspark.sql.functions as psf
df = df.withColumn(
    "rn", 
    psf.monotonically_increasing_id()
)
    +----+----+---+----+-----+--------+----------+
    |v_id|d_id| ip|l_id|delta|event_id|        rn|
    +----+----+---+----+-----+--------+----------+
    |   1|  20| 30|  40|    1|       1|         0|
    |   1|  20| 30|  40|    2|       1|         1|
    |   1|  20| 30|  40|    3|       1|         2|
    |   1|  20| 30|  40|    4|       1|         3|
    |   1|  20| 30|  40|   45|       2|         4|
    |   1|  20| 30|  40|    1|       2|8589934592|
    |   1|  30| 30|  40|    2|       1|8589934593|
    |   1|  30| 30|  40|    3|       1|8589934594|
    |   1|  30| 30|  40|    4|       1|8589934595|
    |   1|  30| 30|  40|    5|       1|8589934596|
    +----+----+---+----+-----+--------+----------+

Now to compute event_id and last_event_flag:

from pyspark.sql import Window
w1 = Window.partitionBy("v_id", "d_id", "l_id", "ip").orderBy("rn")
w2 = Window.partitionBy("v_id", "d_id", "l_id", "ip").orderBy(psf.desc("rn"))
df.withColumn(
    "event_id", 
    psf.sum((df.delta >= 40).cast("int")).over(w1) + 1
).withColumn(
    "last_event_flag", 
    psf.row_number().over(w2) == 1
).drop("rn")

    +----+----+---+----+-----+--------+---------------+
    |v_id|d_id| ip|l_id|delta|event_id|last_event_flag|
    +----+----+---+----+-----+--------+---------------+
    |   1|  20| 30|  40|    1|       1|          false|
    |   1|  20| 30|  40|    2|       1|          false|
    |   1|  20| 30|  40|    3|       1|          false|
    |   1|  20| 30|  40|    4|       1|          false|
    |   1|  20| 30|  40|   45|       2|          false|
    |   1|  20| 30|  40|    1|       2|           true|
    |   1|  30| 30|  40|    2|       1|          false|
    |   1|  30| 30|  40|    3|       1|          false|
    |   1|  30| 30|  40|    4|       1|          false|
    |   1|  30| 30|  40|    5|       1|           true|
    +----+----+---+----+-----+--------+---------------+
MaFF
  • 9,551
  • 2
  • 32
  • 41
  • HI Marie ,,Thanks for the answer,Will try that.Also need to add one more flag(last_event_flag) to identity the last event in that group.Any suggestions? – user8617180 Sep 20 '17 at 18:58
  • You modified your sample data, was it intentional ? – MaFF Sep 20 '17 at 19:04
  • I added the computation of `last_event_flag` without modifying the data sample. I hope it helps, don't forget to mark your question as solved – MaFF Sep 20 '17 at 19:10
0

Perhaps you should calculate df = df[df.delta>=40] before running the groupby- I'm not sure if that matters.

Also you can look into chunksize to perform calculations based on chunks of the csv for memory efficiency. So you might break up the data into chunks of 10000 lines and then run the calculations to avoid memory error.

https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html

How to read a 6 GB csv file with pandas