I am computing a series of Dataset aggregations using scala's /: operator. The code for the aggregations is listed below:
def execute1(
xy: DATASET,
f: Double => Double): Double = {
println("PRINTING: The data points being evaluated: " + xy)
println("PRINTING: Running execute1")
var z = xy.filter{ case(x, y) => abs(y) > EPS}
var ret = - z./:(0.0) { case(s, (x, y)) => {
var px = f(x)
s + px*log(px/y)}
}
ret
}
My issue occurs when I try running the block for a list of separate functions which are passed in as the f parameter. The list of functions is:
lazy val pdfs = Map[Int, Double => Double](
1 -> betaScaled,
2 -> gammaScaled,
3 -> logNormal,
4 -> uniform,
5 -> chiSquaredScaled
)
The executor function that runs the aggregations through the list is:
def execute2(
xy: DATASET,
fs: Iterable[Double=>Double]): Iterable[Double] = {
fs.map(execute1(xy, _))
}
With the final execution block:
val kl_rdd = master_ds.mapPartitions((it:DATASET) => {
val pdfsList = pdfs_broadcast.value.map(
n => pdfs.get(n).get
)
execute2(it, pdfsList).iterator
The problem is, while the aggregations do occur, they seem to all aggregate in the first slot of the output array, when I would like the aggregation for each function to be displayed separately. I ran tests to confirm that all five functions are actually being run, and that they are being summed in the first slot.
The pre-divergence value: -4.999635700491883
The pre-divergence value: -0.0
The pre-divergence value: -0.0
The pre-divergence value: -0.0
The pre-divergence value: -0.0
This is one of the hardest problems I've ever run into, so any direction would be GREATLY appreciated. Will give credit where its due. Thanks!