2

I'm trying to port a code from R to Scala to perform Customer Analysis. I have already computed Recency, Frequency and Monetary factors on Spark into a DataFrame.

Here is the schema of the Dataframe :

df.printSchema 
root
 |-- customerId: integer (nullable = false)
 |-- recency: long (nullable = false)
 |-- frequency: long (nullable = false)
 |-- monetary: double (nullable = false)

And here is a data sample as well :

df.order($"customerId").show 

+----------+-------+---------+------------------+
|customerId|recency|frequency|          monetary|
+----------+-------+---------+------------------+
|         1|    297|      114|            733.27|
|         2|    564|       11|            867.66|
|         3|   1304|        1|             35.89|
|         4|    287|       25|            153.08|
|         6|    290|       94|           316.772|
|         8|   1186|        3|            440.21|
|        11|    561|        5|            489.70|
|        14|    333|       57|            123.94|

I'm trying to find the intervals for on a quantile vector for each column given a probability segment.

In other words, given a probability vector of non-decreasing breakpoints, in my case it will be the quantile vector, find the interval containing each element of x;

i.e. (pseudo-code),

if i <- findInterval(x,v), 
for each index j in x 
    v[i[j]] ≤ x[j] < v[i[j] + 1] where v[0] := - Inf, v[N+1] := + Inf, and N <- length(v). 

In R, this translates to the following code :

probSegment <- c(0.0, 0.25, 0.50, 0.75, 1.0)

RFM_table$Rsegment <- findInterval(RFM_table$Recency, quantile(RFM_table$Recency, probSegment)) 
RFM_table$Fsegment <- findInterval(RFM_table$Frequency, quantile(RFM_table$Frequency, probSegment)) 
RFM_table$Msegment <- findInterval(RFM_table$Monetary, quantile(RFM_table$Monetary, probSegment))

I'm kind of stuck with the quantile function thought.

In an earlier discussion with @zero323, he suggest that I used the percentRank window function which can be used as a shortcut. I'm not sure that I can apply the percentRank function in this case.

How can I apply a quantile function on a Dataframe column with Scala Spark? If this is not possible, can I use the percentRank function instead?

Thanks.

Community
  • 1
  • 1
eliasah
  • 39,588
  • 11
  • 124
  • 154

2 Answers2

2

Well, I still believe that percent_rank is good enough here. Percent percent_rank window function is computed as:

enter image description here

Lets define pr as:

enter image description here

Transforming as follows:

enter image description here

enter image description here

gives a definition of a percentile used, according to Wikipedia, by Microsoft Excel.

So the only thing you really need is findInterval UDF which will return a correct interval index. Alternatively you can use rank directly and match on rank ranges.

Edit

OK, it looks like percent_rank is not a good idea after all:

WARN Window: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation

I am not exactly sure what is the point of moving data to a single partition to call non-aggregate function but it looks like we are back to square one. It is possible to use zipWithIndex on plain RDD:

import org.apache.spark.sql.{Row, DataFrame, Column}
import org.apache.spark.sql.types.{StructType, StructField, LongType}
import org.apache.spark.sql.functions.udf

val df = sc.parallelize(Seq(
  (1, 297, 114, 733.27),
  (2, 564, 11, 867.66),
  (3, 1304, 1,  35.89),
  (4, 287, 25, 153.08),
  (6, 290, 94, 316.772),
  (8, 1186, 3, 440.21),
  (11, 561, 5, 489.70),
  (14, 333, 57, 123.94)
)).toDF("customerId", "recency", "frequency", "monetary")


df.registerTempTable("df")
sqlContext.cacheTable("df")

A small helper:

def addRowNumber(df: DataFrame): DataFrame = {
  // Prepare new schema
  val schema = StructType(
    StructField("row_number", LongType, false) +: df.schema.fields)
  // Add row number
  val rowsWithIndex = df.rdd.zipWithIndex
    .map{case (row: Row, idx: Long) => Row.fromSeq(idx +: row.toSeq)}
  // Create DataFrame
  sqlContext.createDataFrame(rowsWithIndex, schema)
}

and the actual function:

def findInterval(df: DataFrame, column: Column,
    probSegment: Array[Double], outname: String): DataFrame = {

  val n = df.count
  // Map quantiles to indices
  val breakIndices  = probSegment.map(p => (p * (n - 1)).toLong)

  // Add row number
  val dfWithRowNumber = addRowNumber(df.orderBy(column))

  // Map indices to values
  val breaks  = dfWithRowNumber
    .where($"row_number".isin(breakIndices:_*))
    .select(column.cast("double"))
    .map(_.getDouble(0))
    .collect

  // Get interval
  val f = udf((x: Double) =>
    scala.math.abs(java.util.Arrays.binarySearch(breaks, x) + 1))

  // Final result
  dfWithRowNumber
    .select($"*", f(column.cast("double")).alias(outname))
    .drop("row_number")
}

and example usage:

scala> val probs = Array(0.0, 0.25, 0.50, 0.75, 1.0)
probs: Array[Double] = Array(0.0, 0.25, 0.5, 0.75, 1.0)

scala>  findInterval(df, $"recency", probs, "interval").show
+----------+-------+---------+--------+--------+
|customerId|recency|frequency|monetary|interval|
+----------+-------+---------+--------+--------+
|         4|    287|       25|  153.08|       1|
|         6|    290|       94| 316.772|       2|
|         1|    297|      114|  733.27|       2|
|        14|    333|       57|  123.94|       3|
|        11|    561|        5|   489.7|       3|
|         2|    564|       11|  867.66|       4|
|         8|   1186|        3|  440.21|       4|
|         3|   1304|        1|   35.89|       5|
+----------+-------+---------+--------+--------+

but I guess it is far from optimal.

Spark 2.0+:

You could replace manual rank computation with DataFrameStatFunctions.approxQuantile. This would allow for faster interval computation:

 val relativeError: Double = ????
 val breaks = df.stat.approxQuantile("recency", probs, relativeError)
zero323
  • 322,348
  • 103
  • 959
  • 935
1

This can be achieved with Bucketizer. Using the same data frame as in the example above:

import org.apache.spark.ml.feature.Bucketizer

val df = sc.parallelize(Seq(
(1, 297, 114, 733.27),
(2, 564, 11, 867.66),
(3, 1304, 1,  35.89),
(4, 287, 25, 153.08),
(6, 290, 94, 316.772),
(8, 1186, 3, 440.21),
(11, 561, 5, 489.70),
(14, 333, 57, 123.94)
)).toDF("customerId", "recency", "frequency", "monetary")

val targetVars  = Array("recency", "frequency", "monetary")
val probs = Array(0.0, 0.25, 0.50, 0.75, 1.0)
val outputVars = for(varName <- targetVars) yield varName + "Segment"
val breaksArray = for (varName <- targetVars) yield df.stat.approxQuantile(varName, 
probs,0.0)

val bucketizer = new Bucketizer()
 .setInputCols(targetVars)
 .setOutputCols(outputVars)
 .setSplitsArray(breaksArray)

val df_e = bucketizer.transform(df)

df_e.show

Result:

targetVars: Array[String] = Array(recency, frequency, monetary)
outputVars: Array[String] = Array(recencySegment, frequencySegment, monetarySegment)
breaksArray: Array[Array[Double]] = Array(Array(287.0, 290.0, 333.0, 564.0, 1304.0), Array(1.0, 3.0, 11.0, 57.0, 114.0), Array(35.89, 123.94, 316.772, 489.7, 867.66))
+----------+-------+---------+--------+--------------+----------------+--------------
-+|customerId|recency|frequency|monetary|recencySegment|frequencySegment|monetarySegment|
+----------+-------+---------+--------+--------------+----------------+---------------+
|         1|    297|      114|  733.27|           1.0|             3.0|            3.0|
|         2|    564|       11|  867.66|           3.0|             2.0|            3.0|
|         3|   1304|        1|   35.89|           3.0|             0.0|            0.0|
|         4|    287|       25|  153.08|           0.0|             2.0|            1.0|
|         6|    290|       94| 316.772|           1.0|             3.0|            2.0|
|         8|   1186|        3|  440.21|           3.0|             1.0|            2.0|
|        11|    561|        5|   489.7|           2.0|             1.0|            3.0|
|        14|    333|       57|  123.94|           2.0|             3.0|            1.0|
+----------+-------+---------+--------+--------------+----------------+---------------+
  • When I asked this question, Bucketizer didn’t exist in spark-ml. I’m even sure we didn’t even have DataFrames back then. But thanks anyway – eliasah Jun 21 '19 at 05:43