Created a case class to match the input schema of DataSet.
Grouped the Dataset by id and used foldLeft to accumulate the average of each idx in the vector for a grouped Dataset.
scala> case class Test(id: Int, vec: List[Double])
defined class Test
scala> val inputList = List(
| Test(0, List(1, 2, 3, 4)),
| Test(0, List(2, 3, 4, 5)),
| Test(0, List(6, 7, 8, 9)),
| Test(1, List(1, 2, 3, 4)),
| Test(1, List(5, 6, 7, 8)))
inputList: List[Test] = List(Test(0,List(1.0, 2.0, 3.0, 4.0)), Test(0,List(2.0, 3.0, 4.0, 5.0)), Test(0,List(6.0, 7.0, 8.0, 9.0)), Test(1,
List(1.0, 2.0, 3.0, 4.0)), Test(1,List(5.0, 6.0, 7.0, 8.0)))
scala>
scala> import spark.implicits._
import spark.implicits._
scala> val ds = inputList.toDF.as[Test]
ds: org.apache.spark.sql.Dataset[Test] = [id: int, vec: array<double>]
scala> ds.show(false)
+---+--------------------+
|id |vec |
+---+--------------------+
|0 |[1.0, 2.0, 3.0, 4.0]|
|0 |[2.0, 3.0, 4.0, 5.0]|
|0 |[6.0, 7.0, 8.0, 9.0]|
|1 |[1.0, 2.0, 3.0, 4.0]|
|1 |[5.0, 6.0, 7.0, 8.0]|
+---+--------------------+
scala>
scala> val outputDS = ds.groupByKey(_.id).mapGroups {
| case (key, valuePairs) =>
| val vectors = valuePairs.map(_.vec).toArray
| // compute the length of the vectors for each key
| val len = vectors.length
| // get average for each index in vectors
| val avg = vectors.head.indices.foldLeft(List[Double]()) {
| case (acc, idx) =>
| val sumOfIdx = vectors.map(_ (idx)).sum
| acc :+ (sumOfIdx / len)
| }
| Test(key, avg)
| }
outputDS: org.apache.spark.sql.Dataset[Test] = [id: int, vec: array<double>]
scala> outputDS.show(false)
+---+--------------------+
|id |vec |
+---+--------------------+
|1 |[3.0, 4.0, 5.0, 6.0]|
|0 |[3.0, 4.0, 5.0, 6.0]|
+---+--------------------+
Hope this helps!