1

The dataframe is already sorted out by date,

col1 ==1 value is unique,

and col1==1 is passed, it will increase increment by 1 (eg. 1,2,3,4,5,6,7...) and only the -1 are duplicates.

I have a dataframe looks like this call it df

TEST_schema = StructType([StructField("date", StringType(), True),\
                          StructField("col1", IntegerType(), True),\
                          StructField("col2", IntegerType(), True)])
TEST_data = [('2020-08-01',-1,-1),('2020-08-02',-1,-1),('2020-08-03',-1,3),('2020-08-04',-1,2),('2020-08-05',1,4),\
             ('2020-08-06',2,1),('2020-08-07',3,2),('2020-08-08',4,3),('2020-08-09',5,-1)]
rdd3 = sc.parallelize(TEST_data)
TEST_df = sqlContext.createDataFrame(TEST_data, TEST_schema)
TEST_df.show()



+--------+----+----+
    date |col1|col2|
+--------+----+----+
2020-08-01| -1|  -1|
2020-08-02| -1|  -1|
2020-08-03| -1|   3|
2020-08-04| -1|   2|
2020-08-05| 1 |   4|
2020-08-06| 2 |   1|
2020-08-07| 3 |   2|
2020-08-08| 4 |   3|
2020-08-09| 5 |  -1|
+--------+----+----+

The condition is when col1 == 1, then we start adding backwards from col2 ==4, (eg. 4,5,6,7,8,...) and the after col2 == 4 return 0 all the way (eg. 4,0,0,0,0...)

So, my resulted df will look something like this.

   +--------+----+----+----+
        date |col1|col2|want
    +--------+----+----+----+
    2020-08-01| -1|  -1|  8 |
    2020-08-02| -1|  -1|  7 |
    2020-08-03| -1|   3|  6 |
    2020-08-04| -1|   2|  5 |
    2020-08-05| 1 |   4|  4 |
    2020-08-06| 2 |   1|  0 |
    2020-08-07| 3 |   2|  0 |
    2020-08-08| 4 |   3|  0 |
    2020-08-09| 5 |  -1|  0 |
   +---------+----+----+----+  

Enhancement: I want to add additional condition where col2 == -1 when col1 == 1 (at 2020-08-05), and col2 == -1 goes consecutive.. then I want to count consecutive -1, and then add where the consecutive breaks col2 == ? value. so here's an example to clear.

    +--------+----+----+----+
        date |col1|col2|want
    +--------+----+----+----+
    2020-08-01| -1|  -1|  11|
    2020-08-02| -1|  -1|  10|
    2020-08-03| -1|   3|  9 |
    2020-08-04| -1|   2|  8 |
    2020-08-05| 1 |  -1|  7*|
    2020-08-06| 2 |  -1|  0 |
    2020-08-07| 3 |  -1|  0 |
    2020-08-08| 4 |  4*|  0 |
    2020-08-09| 5 |  -1|  0 |
   +---------+----+----+----+  

so, we see 3 consecutive -1s, (starting from 2020-08-05, we only care about first consecutive -1s) and after the consecutive we have 4 (at 2020-08-08 denoted as *), then we would have 4+ 3 =7 at the col1 ==1 row. is it possible?

** MY 1ST ATTEMPT **

TEST_df = TEST_df.withColumn('cumsum', sum(when( col('col1') < 1, col('col1') ) \
                 .otherwise( when( col('col1') == 1, 1).otherwise(0))).over(Window.partitionBy('col1').orderBy().rowsBetween(-sys.maxsize, 0)))
TEST_df.show()

+----------+----+----+------+
|      date|col1|col2|cumsum|
+----------+----+----+------+
|2020-08-01|  -1|  -1|    -1|
|2020-08-02|  -1|  -1|    -2|
|2020-08-03|  -1|   3|    -3|
|2020-08-04|  -1|   2|    -4|
|2020-08-05|   1|   4|     1|
|2020-08-07|   3|   2|     0|
|2020-08-09|   5|  -1|     0|
|2020-08-08|   4|   3|     0|
|2020-08-06|   2|   1|     0|
+----------+----+----+------+

w1 = Window.orderBy(desc('date'))
w2 =Window.partitionBy('case').orderBy(desc('cumsum'))

TEST_df.withColumn('case', sum(when( (col('cumsum') == 1) & (col('col2') != -1) , col('col2')) \
       .otherwise(0)).over(w1)) \
  .withColumn('rank', when(col('case') != 0, rank().over(w2)-1).otherwise(0)) \
  .withColumn('want', col('case') + col('rank')) \
  .orderBy('date') \
+----------+----+----+------+----+----+----+
|date      |col1|col2|cumsum|case|rank|want|
+----------+----+----+------+----+----+----+
|2020-08-01|-1  |-1  |-1    |4   |1   |5   |
|2020-08-02|-1  |-1  |-2    |4   |2   |6   |
|2020-08-03|-1  |3   |-3    |4   |3   |7   |
|2020-08-04|-1  |2   |-4    |4   |4   |8   |
|2020-08-05|1   |4   |1     |4   |0   |4   |
|2020-08-06|2   |1   |0     |0   |0   |0   |
|2020-08-07|3   |2   |0     |0   |0   |0   |
|2020-08-08|4   |3   |0     |0   |0   |0   |
|2020-08-09|5   |-1  |0     |0   |0   |0   |
+----------+----+----+------+----+----+----+

You see that rank 1,2,3,4 if I can make it 4,3,2,1 it will look like my resulted dataframe.... how to reverse it? i tried both orderby asc, and desc... and of course this is before the enhancement

hellotherebj
  • 121
  • 8

1 Answers1

1

IIUC, you can try the following:

  1. groupby and create a collect_list of all related rows(vals in below code), sort the list by date in desencending order (Note: change groupby(lit(1)) to whatever columns you can use to divide your data into independent subset.

  2. find the array index idx which has col1 == 1

  3. if col2==-1 at idx, then find the offset from idx to the beginning of the list with the first row having col2 != -1 (Note: in the current code, offset might be NULL if all col2 before idx are -1, you will have to decide what you want. for example use coalesce(IF(...),0))

  4. after we have offset and idx, the want column can be calculated by:

    IF(i<idx, 0, vals[idx-offset].col2 + offset + i - idx)
    
  5. use SparkSQL function inline to explode the array of structs.

Note: The same logic can be applied using Window function in case too many columns exist in your production dataframe.

Code below:

from pyspark.sql.functions import sort_array, collect_list, struct, expr, lit

TEST_df = spark.createDataFrame([
  ('2020-08-01', -1, -1), ('2020-08-02', -1, -1), ('2020-08-03', -1, 3),
  ('2020-08-04', -1, 2), ('2020-08-05', 1, -1), ('2020-08-06', 2, -1),
  ('2020-08-07', 3, -1), ('2020-08-08', 4, 4), ('2020-08-09', 5, -1)
], ['date', 'col1', 'col2'])

# list of column used in calculation
cols = ["date", "col1", "col2"]

df_new = TEST_df \
    .groupby(lit(1)) \
    .agg(sort_array(collect_list(struct(*cols)),False).alias('vals')) \
    .withColumn('idx', expr("filter(sequence(0,size(vals)-1), i -> vals[i].col1=1)[0]")) \
    .withColumn('offset', expr("""
        coalesce(IF(vals[idx].col2=-1, filter(sequence(1,idx), i -> vals[idx-i].col2 != -1)[0],0),0)
     """)).selectExpr("""
       inline(
         transform(vals, (x,i) -> named_struct(
             'dta', x,
             'want', IF(i<idx, 0, vals[idx-offset].col2 + offset + i - idx)
           )
         )
    )""").select('dta.*', 'want')

Output:

df_new.orderBy('date').show()
+----------+----+----+----+
|      date|col1|col2|want|
+----------+----+----+----+
|2020-08-01|  -1|  -1|  11|
|2020-08-02|  -1|  -1|  10|
|2020-08-03|  -1|   3|   9|
|2020-08-04|  -1|   2|   8|
|2020-08-05|   1|  -1|   7|
|2020-08-06|   2|  -1|   0|
|2020-08-07|   3|  -1|   0|
|2020-08-08|   4|   4|   0|
|2020-08-09|   5|  -1|   0|
+----------+----+----+----+

Edit: Per comments, added an alternative to use Window aggregate function instead of groupby:

from pyspark.sql import Window

# WindowSpec to cover all related Rows in the same partition
w1 = Window.partitionBy().orderBy('date').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)

cols = ["date", "col1", "col2"]

# below `cur_idx` is the index for the current Row in array `vals`
df_new = TEST_df.withColumn('vals', sort_array(collect_list(struct(*cols)).over(w1),False)) \
    .withColumn('idx', expr("filter(sequence(0,size(vals)-1), i -> vals[i].col1=1)[0]")) \
    .withColumn('offset', expr("IF(vals[idx].col2=-1, filter(sequence(1,idx), i -> vals[idx-i].col2 != -1)[0],0)")) \
    .withColumn("cur_idx", expr("array_position(vals, struct(date,col1,col2))-1")) \
    .selectExpr(*TEST_df.columns, "IF(cur_idx<idx, 0, vals[idx-offset].col2 + offset + cur_idx - idx) as want")
jxc
  • 13,553
  • 4
  • 16
  • 34
  • Thank you for your hard work jxc... example use coalesce(IF(...),0) for this one, where do I put coalesce condition? and yes I do want it like offset =0 if col2 before idx are -1 – hellotherebj Aug 07 '20 at 17:40
  • @hellotherebj theoretically, it should be used to the output of the `filter` statement, but the end-result should be the same: `IF(vals[idx].col2=-1, coalesce(filter(sequence(1,idx), i -> vals[idx-i].col2 != -1)[0],0),0)` – jxc Aug 07 '20 at 18:04
  • Thank you. it makes sense. could you please have look at my new question? it's somewhat similiar to this.. https://stackoverflow.com/questions/63308490/pyspark-how-to-code-complicated-dataframe-calculation-lead-sum – hellotherebj Aug 07 '20 at 20:02
  • Hey jxc, I posted another hard problem... I can't seem to do this myself so i need your help.. https://stackoverflow.com/questions/63384238/pyspark-how-to-code-complicated-dataframe-algorithm-problem-summing-with-condi – hellotherebj Aug 12 '20 at 20:25
  • hey jxd, hope you had a great weekend. what if I want to include 10 columns, do I have to do something like cols = ['col0', 'col1', .... , 'col10'] like this or is there anyway I can do this faster? like '*' or something. and that means inline needs to include all the 10 columns as well right? – hellotherebj Aug 18 '20 at 02:37
  • how many columns are used in the calculation? if only 3 columns, then, you can probably try using Window function instead of groupby. @hellotherebj – jxc Aug 18 '20 at 02:43
  • I have maybe around 14 columns. so my final output would contain 15 columns including 'want' column. – hellotherebj Aug 18 '20 at 02:44
  • if only 2 columns (out of total 14) are used in calculating `idx` and `offset`, then it's probably simpler to use Window function. the `date` column must be the first of the list as it's essential to sort the data by date. – jxc Aug 18 '20 at 02:59
  • yes only 2 columns are used for the calculation and keeping all the rest of the columns.. so you mean instead of selectexpr, i use withColumn and use window function correct? – hellotherebj Aug 18 '20 at 03:02
  • like 'withColumn('want' expr("IF(i – hellotherebj Aug 18 '20 at 03:05
  • @hellotherebj, added an alternative solution, please check the `Edit` section in my post. – jxc Aug 18 '20 at 03:19
  • 1
    if you want to keep the original method, we have to include all columns in named_struct() inside the inline function. Just do it using Python format: `.selectExpr(""" inline(transform(vals, (x,i) -> named_struct({}, 'want', IF(i – jxc Aug 18 '20 at 03:41
  • i got an error it occured at selectExpr : pyspark.sql.utils.AnalysisException: "Undefined function: 'demand_u_'. this function is neither a registered temporary function nor a permanent function registered in the database 'default'.;.... demand_u is one of the column name – hellotherebj Aug 18 '20 at 03:56
  • it worked after i got rid of that demand_u column. hm – hellotherebj Aug 18 '20 at 04:04
  • if column name contains special characters like SPACE, dot etc, *TEST_df will have issue. can you check changing `*TEST_df` in the selectExpr to `*["\`{}\`".format(c) for c in TEST_df.columns]`. just to enclose each column name with back-ticks. – jxc Aug 18 '20 at 04:12
  • thank you, but when i do .show() it takes too much time to show in console.. should I set repartition somewhere? – hellotherebj Aug 18 '20 at 04:37
  • jxc could you have a look at my new problem? thanks in advance. https://stackoverflow.com/questions/63475831/pyspark-how-to-lag-with-offset – hellotherebj Aug 18 '20 at 20:02
  • jxc, ive posted new problem.. it's rounding problem!https://stackoverflow.com/questions/63527697/pyspark-how-to-round-up-or-down-round-to-the-nearest – hellotherebj Aug 21 '20 at 17:46
  • hey jxc long time no see~~ I've posted new problem.. this might be long >< https://stackoverflow.com/questions/64084727/pyspark-how-to-solve-complicated-dataframe-logic-plus-join – hellotherebj Sep 27 '20 at 04:13
  • please take a look sir :) – hellotherebj Sep 27 '20 at 04:14
  • is it possible to calculate without window function? – hellotherebj Oct 06 '20 at 15:42
  • just wondering if we can get rid of this and do the same calculation# WindowSpec to cover all related Rows in the same partition w1 = Window.partitionBy().orderBy('date').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing) – hellotherebj Oct 06 '20 at 15:53
  • or without using this array – hellotherebj Oct 06 '20 at 17:17
  • @hellotherebj, you can probably use pandas_udf if you can sort out the logic using pandas. Arrays since Spark2.4 provide some very powerful functionalities to deal with complex logic w/o using udf. but it also has limitations especially when the function requires groupby/window-partition while your data are skewed. – jxc Oct 06 '20 at 17:32
  • could you show what the logic would look like with udf? we can use groupby/partition still but I want to avoid using sort_arrays – hellotherebj Oct 06 '20 at 17:47
  • @hellotherebj, after groupby and repartition, the order of data rows to the same group is not guaranteed, so the sort_array is not optional. even with udf, it will be much easier to handle sorted data. – jxc Oct 06 '20 at 18:14
  • I managed to do it by populating extra columns and window function, just FYI @jxc – hellotherebj Oct 06 '20 at 22:26
  • @hellotherebj, remove `sort_array` from the original code might just fine as the WinSpec is already sorted by `Date` based on the link I added in another comment. – jxc Oct 06 '20 at 22:48