0

I am converting a plain R code into SparkR to make efficient use of Spark.

I have the below column CloseDate.

CloseDate
2011-01-08
2011-02-07
2012-04-07
2013-04-18
2011-02-07
2010-11-10
2010-12-09
2013-02-18
2010-12-09
2011-03-11
2011-04-10
2013-06-19
2011-04-10
2011-01-06
2011-02-06
2013-04-16
2011-02-06
2015-09-25
2015-09-25
2010-11-10

I want to count the number of time that date has been increased|decreased. I have the below R code to do that.

dateChange <- function(closeDate, dir){
  close_dt <- as.Date(closeDate)
  num_closedt_out = 0
  num_closedt_in = 0

  for(j in 1:length(close_dt)) 
  {
    curr <- close_dt[j]
    if (j > 1)
      prev <- close_dt[j-1]
    else 
      prev <- curr
    if (curr > prev){
      num_closedt_out = num_closedt_out + 1
    }
    else if (curr < prev){
      num_closedt_in = num_closedt_in + 1
    }
  }
  if (dir=="inc")
    ret <- num_closedt_out
  else if (dir=="dec")
    ret <- num_closedt_in
  ret
} 

I tried to use SparkR df$col here. Since spark lazily executes the code, I didn't get the value of length during this execution and getting NaN error.

Here is the modified code that I tried.

DateDirChanges <- function(closeDate, dir){
  close_dt <- to_date(closeDate)
  num_closedt_out = 0
  num_closedt_in = 0

  col_len <- SparkR::count(close_dt)
  for(j in 1:col_len) 
  {
    curr <- close_dt[j]
    if (j > 1)
      prev <- close_dt[j-1]
    else 
      prev <- curr
    if (curr > prev){
      num_closedt_out = num_closedt_out + 1
    }
    else if (curr < prev){
      num_closedt_in = num_closedt_in + 1
    }
  }
  if (dir=="inc")
    ret <- num_closedt_out
  else if (dir=="dec")
    ret <- num_closedt_in
  ret
}

How can I get the length of a column during the execution of this code? Or is there any other better to do it?

sag
  • 5,333
  • 8
  • 54
  • 91

1 Answers1

2

You cannot because Column simply has no length. Unlike what you may expect in R columns don't represent data but SQL expressions and specific data transformations. Moreover order of values in Spark DataFrame is arbitrary so you cannot simply look around.

If data can be partitioned as in your previous question you can use window functions in the same may as I've shown in the answer to your previous question. Otherwise there is no efficient way to handle this using SparkR alone.

Assuming there is a way to determine order (required) and you can partition your data (desired to get reasonable performance) all you need is something like this:

SELECT
   CAST(LAG(CloseDate, 1) OVER w > CloseDate AS INT) gt,
   CAST(LAG(CloseDate, 1) OVER w < CloseDate AS INT) lt,
   CAST(LAG(CloseDate, 1) OVER w = CloseDate AS INT) eq
FROM DF
WINDOW w AS (
  PARTITION BY partition_col ORDER BY order_col
)
Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • I think I can partition the data as well. But there we use datediff to get the desired output. But here I need to write a custom function. Like, it should check whether the value increases or decreases from its LAG and it should just return the number of the times the value increased or decreased. So this custom function has to read the data to create a new column. Is there any possibility to do that? – sag Apr 07 '16 at 09:38
  • As long as you have a way to determine order (required) and partitioning (for performance) it is pretty easy. – zero323 Apr 07 '16 at 09:43
  • This is exactly similar to the previous question. I just did persist into tempTable and got the lag. There we use datediff to get the difference. Here we need to write something like getIncrementCount(df$closeDate, df$lagCloseDate). In that function I need to iterate through and maintain a count. This count has to be increased by one everytime when the closeDate is more than lagCloseDate. I did refer some the default functions provided by SparkR, but all of them are calling a java to do that. Is it possible to it R? Sorry if the question is too dumb. I am very new to R and SparkR – sag Apr 07 '16 at 10:14