1

I have a dataframe with three columns: id, index and value.

+---+-----+-------------------+
| id|index|              value|
+---+-----+-------------------+
|  A| 1023|0.09938822262205915|
|  A| 1046| 0.3110047630613805|
|  A| 1069| 0.8486710971453512|
+---+-----+-------------------+

root
 |-- id: string (nullable = true)
 |-- index: integer (nullable = false)
 |-- value: double (nullable = false)

Then, I have another dataframe which shows desirable periods for each id:

+---+-----------+---------+
| id|start_index|end_index|
+---+-----------+---------+
|  A|       1069|     1276|
|  B|       2066|     2291|
|  B|       1616|     1841|
|  C|       3716|     3932|
+---+-----------+---------+

root
 |-- id: string (nullable = true)
 |-- start_index: integer (nullable = false)
 |-- end_index: integer (nullable = false)

I have three templates as below

val template1 = Array(0.0, 0.1, 0.15, 0.2, 0.3, 0.33, 0.42, 0.51, 0.61, 0.7)
val template2 = Array(0.96, 0.89, 0.82, 0.76, 0.71, 0.65, 0.57, 0.51, 0.41, 0.35)
val template3 = Array(0.0, 0.07, 0.21, 0.41, 0.53, 0.42, 0.34, 0.25, 0.19, 0.06)

The goal is, for each row in dfIntervals, apply a function (let's assume it's correlation) in which the function receives value column from dfRaw and three template arrays and adds three columns to dfIntervals, each column related to each template.

Assumptions: 1 - Sizes of templates arrays are are exactly 10.

2 - There are no duplicates in index column of dfRaw

3 - start_index and end_index columns in dfIntervals exist in index column of dfRaw and when there are exactly 10 rows between them. For instance, dfRaw.filter($"id" === "A").filter($"index" >= 1069 && $"index" <= 1276).count (first row in dfIntervals) results in exactly 10.

Here's the code that generates these dataframes:

import org.apache.spark.sql.functions._
val mySeed = 1000

/* Defining templates for correlation analysis*/
val template1 = Array(0.0, 0.1, 0.15, 0.2, 0.3, 0.33, 0.42, 0.51, 0.61, 0.7)
val template2 = Array(0.96, 0.89, 0.82, 0.76, 0.71, 0.65, 0.57, 0.51, 0.41, 0.35)
val template3 = Array(0.0, 0.07, 0.21, 0.41, 0.53, 0.42, 0.34, 0.25, 0.19, 0.06)

/* Defining raw data*/
var dfRaw = Seq(
  ("A", (1023 to 1603 by 23).toArray),
  ("B", (341 to 2300 by 25).toArray),
  ("C", (2756 to 3954 by 24).toArray)
).toDF("id", "index")
dfRaw = dfRaw.select($"id", explode($"index") as "index").withColumn("value", rand(seed=mySeed))

/* Defining intervals*/
var dfIntervals = Seq(
  ("A", 1069, 1276),
  ("B", 2066, 2291),
  ("B", 1616, 1841),
  ("C", 3716, 3932)
).toDF("id", "start_index", "end_index")

There result is three columns added to dfIntervals dataframe with names corr_w_template1, corr_w_template2 and corr_w_template3

PS: I could not find a correlation function in Scala. Let's assume such a function exists (as below) and we are about to make a udf out of it is needed.

def correlation(arr1: Array[Double], arr2: Array[Double]): Double
ahoosh
  • 1,340
  • 3
  • 17
  • 31
  • As I understand, You need a udf as following: `def correlation(value: Double, template: Array[Double]): Double` where `template` can be one of these values: `template1`, `template2`, `template3`. And `value` is from `dfRaw` . Right? – tauitdnmd Aug 27 '18 at 03:02
  • That's correct. The functions needs to be applied three times I suppose to get the correlation between the raw signal and each of the templates. – ahoosh Aug 27 '18 at 03:06
  • `dfIntervals` contains `start_index` & `end_index` So it should be `correlation(values: Array[Double], template: Array[Double]): Double` right? Where `values` is obtained from dfRaw which has `index` in the range [start_index: end_index] – tauitdnmd Aug 27 '18 at 04:16
  • Yes, that is correct. – ahoosh Aug 28 '18 at 16:24

1 Answers1

1

Ok.

Let's define a UDF function.

For testing purpose, let'say it will always return 1.

 val correlation = functions.udf( (values: mutable.WrappedArray[Double], template: mutable.WrappedArray[Double]) => {

     1f
  })

val orderUdf = udf((values: mutable.WrappedArray[Row]) => {
    values.sortBy(r => r.getAs[Int](0)).map(r => r.getAs[Double](1))
  })

Then let's join your 2 data frames with the defined rules & collect value into 1 column called values. Also, apply our orderUdf

 val df = dfIntervals.join(dfRaw,dfIntervals("id") === dfRaw("id") && dfIntervals("start_index")  <= dfRaw("index") && dfRaw("index") <= dfIntervals("end_index") )
    .groupBy(dfIntervals("id"), dfIntervals("start_index"),  dfIntervals("end_index"))
    .agg(orderUdf(collect_list(struct(dfRaw("index"), dfRaw("value")))).as("values"))

Finally, apply our udf & show it out.

df.withColumn("corr_w_template1",correlation(df("values"), lit(template1)))
    .withColumn("corr_w_template2",correlation(df("values"), lit(template2)))
    .withColumn("corr_w_template3",correlation(df("values"), lit(template3)))
    .show(10)

This is full of example code:

import org.apache.spark.sql.functions._
  import scala.collection.JavaConverters._

  val conf = new SparkConf().setAppName("learning").setMaster("local[2]")

  val session = SparkSession.builder().config(conf).getOrCreate()



  val mySeed = 1000

  /* Defining templates for correlation analysis*/
  val template1 = Array(0.0, 0.1, 0.15, 0.2, 0.3, 0.33, 0.42, 0.51, 0.61, 0.7)
  val template2 = Array(0.96, 0.89, 0.82, 0.76, 0.71, 0.65, 0.57, 0.51, 0.41, 0.35)
  val template3 = Array(0.0, 0.07, 0.21, 0.41, 0.53, 0.42, 0.34, 0.25, 0.19, 0.06)

  val schema1 =  DataTypes.createStructType(Array(
    DataTypes.createStructField("id",DataTypes.StringType,false),
    DataTypes.createStructField("index",DataTypes.createArrayType(DataTypes.IntegerType),false)
  ))

  val schema2 =  DataTypes.createStructType(Array(
    DataTypes.createStructField("id",DataTypes.StringType,false),
    DataTypes.createStructField("start_index",DataTypes.IntegerType,false),
    DataTypes.createStructField("end_index",DataTypes.IntegerType,false)
  ))

  /* Defining raw data*/
  var dfRaw = session.createDataFrame(Seq(
    ("A", (1023 to 1603 by 23).toArray),
    ("B", (341 to 2300 by 25).toArray),
    ("C", (2756 to 3954 by 24).toArray)
  ).map(r => Row(r._1 , r._2)).asJava, schema1)

  dfRaw = dfRaw.select(dfRaw("id"), explode(dfRaw("index")) as "index")
    .withColumn("value", rand(seed=mySeed))

  /* Defining intervals*/
  var dfIntervals =  session.createDataFrame(Seq(
    ("A", 1069, 1276),
    ("B", 2066, 2291),
    ("B", 1616, 1841),
    ("C", 3716, 3932)
  ).map(r => Row(r._1 , r._2,r._3)).asJava, schema2)

  //Define udf

  val correlation = functions.udf( (values: mutable.WrappedArray[Double], template: mutable.WrappedArray[Double]) => {
     1f
  })

  val orderUdf = udf((values: mutable.WrappedArray[Row]) => {
    values.sortBy(r => r.getAs[Int](0)).map(r => r.getAs[Double](1))
  })


  val df = dfIntervals.join(dfRaw,dfIntervals("id") === dfRaw("id") && dfIntervals("start_index")  <= dfRaw("index") && dfRaw("index") <= dfIntervals("end_index") )
    .groupBy(dfIntervals("id"), dfIntervals("start_index"),  dfIntervals("end_index"))
    .agg(orderUdf(collect_list(struct(dfRaw("index"), dfRaw("value")))).as("values"))

  df.withColumn("corr_w_template1",correlation(df("values"), lit(template1)))
    .withColumn("corr_w_template2",correlation(df("values"), lit(template2)))
    .withColumn("corr_w_template3",correlation(df("values"), lit(template3)))
    .show(10,false)
tauitdnmd
  • 369
  • 2
  • 9
  • I think that `dfRaw("id") === dfIntervals("id")` should be added to the join conditions to be `dfIntervals.join(dfRaw, dfIntervals("start_index") <= dfRaw("index") && dfRaw("index") <= dfIntervals( "end_index") && dfRaw("id") === dfIntervals("id"))` Otherwise, there will be contamination from other ids. when we do that, values order get reversed! We need to include `index`Win `collect_list` and use a sorter udf to make sure the values in the list are in order. Scala version of this [here](https://stackoverflow.com/questions/46580253/collect-list-by-preserving-order-based-on-another-variable) – ahoosh Aug 27 '18 at 17:56
  • yeah. That's my mistake. I have updated the solution – tauitdnmd Aug 27 '18 at 17:57
  • Great. Thanks for the update. Is there a way to make sure `values` column is already ordered in `df`? While this can be done inside `udf` function, it would be cleaner to do it that way similar to [this Python solution](https://stackoverflow.com/questions/46580253/collect-list-by-preserving-order-based-on-another-variable) – ahoosh Aug 27 '18 at 18:08
  • I meant to order the values based on index. Currently it is sorted based on values! Perhaps using `collect_list(struct(dfRaw("index"),dfRaw("value"))` and then using a map something along `(key, xs) => (key, xs.map(_.c2).toSeq.sortBy(_._2)))` will do that? – ahoosh Aug 27 '18 at 18:35
  • Yes. it's great – tauitdnmd Aug 27 '18 at 18:43
  • check it out again please – tauitdnmd Aug 27 '18 at 19:11