I have this class:
case class IDADiscretizer(
nAttrs: Int,
nBins: Int = 5,
s: Int = 5) extends Serializable {
private[this] val log = LoggerFactory.getLogger(this.getClass)
private[this] val V = Vector.tabulate(nAttrs)(i => new IntervalHeapWrapper(nBins, i))
private[this] val randomReservoir = SamplingUtils.reservoirSample((1 to s).toList.iterator, 1)
def updateSamples(v: LabeledVector): Vector[IntervalHeapWrapper] = {
val attrs = v.vector.map(_._2)
val label = v.label
// TODO: Check for missing values
attrs
.zipWithIndex
.foreach {
case (attr, i) =>
if (V(i).getNbSamples < s) {
V(i) insertValue attr // insert
} else {
if (randomReservoir(0) <= s / (i + 1)) {
//val randVal = Random nextInt s
//V(i) replace (randVal, attr)
V(i) insertValue attr
}
}
}
V
}
/**
* Return the cutpoints for the discretization
*
*/
def cutPoints: Vector[Vector[Double]] = V map (_.getBoundaries.toVector)
def discretize(data: DataSet[LabeledVector]): (DataSet[Vector[IntervalHeapWrapper]], Vector[Vector[Double]]) = {
val r = data map (x => updateSamples(x))
val c = cutPoints
(r, c)
}
}
Using flink, I would like to get the cutpoints after the call of discretize
, but it seems the information stored in V
get loss. Do I have to use Broadcast
like in this question? is there a better way to access the state of class?
I've tried to call cutpoints
in two ways, one with is:
def discretize(data: DataSet[LabeledVector]) = data map (x => updateSamples(x))
Then, called from outside:
val a = IDADiscretizer(nAttrs = 4)
val r = a.discretize(dataSet)
r.print
val cuts = a.cutPoints
Here, cuts is empty so I tried to compute the discretization as well as the cutpoints inside discretize
:
def discretize(data: DataSet[LabeledVector]) = {
val r = data map (x => updateSamples(x))
val c = cutPoints
(r, c)
}
And use it like this:
val a = IDADiscretizer(nAttrs = 4)
val (d, c) = a.discretize(dataSet)
c foreach println
But the same happends.
Finally, I've also tried to make V
completely public:
val V = Vector.tabulate(nAttrs)(i => new IntervalHeapWrapper(nBins, i))
Still empty
What am I doing wrong?
Related questions:
- Keep keyed state across multiple transformations
- Flink State backend keys atomicy and distribution
- Flink: does state access across stream?
- Flink: Sharing state in CoFlatMapFunction
Answer
Thanks to @TillRohrmann what I finally did was:
private[this] def computeCutPoints(x: LabeledVector) = {
val attrs = x.vector.map(_._2)
val label = x.label
attrs
.zipWithIndex
.foldLeft(V) {
case (iv, (v, i)) =>
iv(i) insertValue v
iv
}
}
/**
* Return the cutpoints for the discretization
*
*/
def cutPoints(data: DataSet[LabeledVector]): Seq[Seq[Double]] =
data.map(computeCutPoints _)
.collect
.last.map(_.getBoundaries.toVector)
def discretize(data: DataSet[LabeledVector]): DataSet[LabeledVector] =
data.map(updateSamples _)
And then use it like this:
val a = IDADiscretizer(nAttrs = 4)
val d = a.discretize(dataSet)
val cuts = a.cutPoints(dataSet)
d.print
cuts foreach println
I do not know if it is the best way, but at least is working now.