You could try this. I basically generate two columns first(first non null value=110) and stock2 which is basically incremental sum of stock and then subtract them from each other to get your desired stock.
from pyspark.sql.window import Window
from pyspark.sql import functions as F
w=Window().partitionBy("item","store").orderBy("timestamp")
w2=Window().partitionBy("item","store").orderBy("timestamp").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("stock1", F.when(F.col("stock").isNull(), F.lit(0)).otherwise(F.col("stock")))\
.withColumn("stock2", F.sum("sales_qty").over(w)- F.lit(1))\
.withColumn("first", F.first("stock", True).over(w2))\
.withColumn("stock", F.col("first")-F.col("stock2"))\
.drop("stock1","stock2","first")\
.show()
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| 110|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| 109|
|673895|35578| 20180104| 0| 109|
|673895|35578| 20180105| 0| 109|
|673895|35578| 20180106| 1| 108|
|673895|35578| 20180107| 0| 108|
|673895|35578| 20180108| 0| 108|
|673895|35578| 20180109| 0| 108|
|673895|35578| 20180110| 1| 107|
+------+-----+---------+---------+-----+
If you would like to force your first value to null instead of 110(as shown in your desired output) you could use this.(basically uses rownumber to replace that first 110 value with null) :
from pyspark.sql.window import Window
from pyspark.sql import functions as F
w=Window().partitionBy("item","store").orderBy("timestamp")
w2=Window().partitionBy("item","store").orderBy("timestamp").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("stock1", F.when(F.col("stock").isNull(), F.lit(0)).otherwise(F.col("stock")))\
.withColumn("stock2", F.sum("sales_qty").over(w)- F.lit(1))\
.withColumn("first", F.first("stock", True).over(w2))\
.withColumn("stock", F.col("first")-F.col("stock2"))\
.withColumn("num", F.row_number().over(w))\
.withColumn("stock", F.when(F.col("num")==1, F.lit(None)).otherwise(F.col("stock")))\
.drop("stock1","stock2","first","num")\
.show()
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| 109|
|673895|35578| 20180104| 0| 109|
|673895|35578| 20180105| 0| 109|
|673895|35578| 20180106| 1| 108|
|673895|35578| 20180107| 0| 108|
|673895|35578| 20180108| 0| 108|
|673895|35578| 20180109| 0| 108|
|673895|35578| 20180110| 1| 107|
+------+-----+---------+---------+-----+
Additional data INPUT and OUTPUT:
#input1
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| null|
|673895|35578| 20180104| 3| null|
|673895|35578| 20180105| 0| 109|
|673895|35578| 20180106| 1| null|
|673895|35578| 20180107| 0| 108|
|673895|35578| 20180108| 4| null|
|673895|35578| 20180109| 0| null|
|673895|35578| 20180110| 1| null|
+------+-----+---------+---------+-----+
#output1
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| 109|
|673895|35578| 20180104| 3| 106|
|673895|35578| 20180105| 0| 106|
|673895|35578| 20180106| 1| 105|
|673895|35578| 20180107| 0| 105|
|673895|35578| 20180108| 4| 101|
|673895|35578| 20180109| 0| 101|
|673895|35578| 20180110| 1| 100|
+------+-----+---------+---------+-----+
#input2
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| null|
|673895|35578| 20180104| 7| null|
|673895|35578| 20180105| 0| 102|
|673895|35578| 20180106| 0| null|
|673895|35578| 20180107| 4| 98|
|673895|35578| 20180108| 0| null|
|673895|35578| 20180109| 0| null|
|673895|35578| 20180110| 1| null|
+------+-----+---------+---------+-----+
#output2
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| 109|
|673895|35578| 20180104| 7| 102|
|673895|35578| 20180105| 0| 102|
|673895|35578| 20180106| 0| 102|
|673895|35578| 20180107| 4| 98|
|673895|35578| 20180108| 0| 98|
|673895|35578| 20180109| 0| 98|
|673895|35578| 20180110| 1| 97|
+------+-----+---------+---------+-----+
IF, the stock
quantities are NOT continuous like this:
df.show()
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| null|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| null|
|673895|35578| 20180104| 7| null|
|673895|35578| 20180105| 0| 112|
|673895|35578| 20180106| 2| null|
|673895|35578| 20180107| 0| 107|
|673895|35578| 20180108| 0| null|
|673895|35578| 20180109| 0| null|
|673895|35578| 20180110| 1| null|
+------+-----+---------+---------+-----+
You could use this:(i basically compute a dynamic window for each non-null last)
from pyspark.sql.window import Window
from pyspark.sql import functions as F
w=Window().partitionBy("item","store").orderBy("timestamp")
w3=Window().partitionBy("item","store","stock5").orderBy("timestamp")
df.withColumn("stock1", F.when(F.col("stock").isNull(), F.lit(0)).otherwise(F.col("stock")))\
.withColumn("stock4", F.when(F.col("stock1")!=0, F.rank().over(w)).otherwise(F.col("stock1")))\
.withColumn("stock5", F.sum("stock4").over(w))\
.withColumn("stock6", F.sum("stock1").over(w3))\
.withColumn("sum", F.sum(F.when(F.col("stock1")!=F.col("stock6"),F.col("sales_qty")).otherwise(F.lit(0))).over(w3))\
.withColumn("stock2", F.when(F.col("sales_qty")!=0, F.col("stock6")-F.col("sum")).otherwise(F.col("stock")))\
.withColumn("stock", F.when((F.col("stock2").isNull())&(F.col("sales_qty")==0),F.col("stock6")-F.col("sum")).otherwise(F.col("stock2")))\
.drop("stock1","stock4","stock5","stock6","sum","stock2")\
.show()
+------+-----+---------+---------+-----+
| item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101| 1| 0|
|673895|35578| 20180102| 0| 110|
|673895|35578| 20180103| 1| 109|
|673895|35578| 20180104| 7| 102|
|673895|35578| 20180105| 0| 112|
|673895|35578| 20180106| 2| 110|
|673895|35578| 20180107| 0| 107|
|673895|35578| 20180108| 0| 107|
|673895|35578| 20180109| 0| 107|
|673895|35578| 20180110| 1| 106|
+------+-----+---------+---------+-----+