0

I'm quite new to pyspark and not skilled python engineer trying to understand pandas UDF application for my case. I have developed ArimaX model, which for each "id" performs 4 outlook forecast (M1 till M4 ahead) while for each outlook 12 models are run (for last 12M takes previous 36M to predict 1 (or 2,3,4) months ahead and once done, RMSE is calculated over the 12 forecast made for of 4 models outlooks.

Core logic could be written as follows:

for id in id_list:
    #filter df for id & do some calcs..
    
    for outlook from range(0,4):
        #prepare df for outlook range..
        
        for per in per_range:
            #perform modelling for all 12M periods; smt like a 1 per window movement
            
        #calc rmse over 12M as prelim result & append forecast details in delta 

In between I have various checks over relevancy of history, decision over exogenous variable, etc.

The problem is, that for one id the run of 4 forecast outlooks takes 1 min and I will have now 2000 ids to run, meaning ~30+ hour job run on cluster with single node.

Therefore, I would like to create pandas UDF to reduce the time and parallelize the job but I'm unsure how to handle the nested loops and that window movement in the most efficient manner.

I would appreciate any direction on what should be/is the best course of action so the solution is not an overkill for my set of skills (as well as time => business as usual). Maybe its a link to learning material I need to go thru first etc.

Thank you very much in advance!

I tried search online and here couldn't find adequate example.

Edit: After some research and with a "push" of Chris (thank you) I found out that:

I was not able figure out how to elegantly pass parameters to udf (like 1 value per 1 od) so I added those as df column on input; this is not nice and column repeats values, but is reachable inside the udf for me.

KubaS
  • 1
  • 2
  • pyspark has an `applyInPandas` method. Here is an example of how to train thousands of prophet models using this method. It should translate easily to your use case. https://www.databricks.com/blog/2021/04/06/fine-grained-time-series-forecasting-at-scale-with-facebook-prophet-and-apache-spark-updated-for-spark-3.html – Chris Mar 09 '23 at 17:30
  • Thanks @Chris, I've seen this one and probably it helped me to write down the issue as I think I better follow now - do I get it right that I need to prepare my partition by grouping id and newly outlook_id, period_id? Meaning my problem is in the way I compose the data before grouping and need to reflect all the combination I'm interested in before (e.g. for one per_id, lets say Jan 23, I would need to have in dataframe a part containing 4 times (4 forecast outlooks) 36 months (rows) of history expecting 37M (rows) on output (1M forecasted))? – KubaS Mar 10 '23 at 08:57
  • You could have several functions. The first could be responsible for 4 month forecasts for each ID, in which case you'd group by ID and apply that function. The resulting dataframe could then be joined back to the original data and fed into the scoring function. If the scoring function is too slow because of the 12 jobs, you could in theory replicate the data with a lag indicator (1-12) and group by ID, lag in order to inform the scoring function which lag to use. I doubt that would be necessary and could be slower if the data is too small, but worth thinking about. – Chris Mar 10 '23 at 14:17

0 Answers0