1

I have this class:

case class IDADiscretizer(
  nAttrs: Int,
  nBins: Int = 5,
  s: Int = 5) extends Serializable {

  private[this] val log = LoggerFactory.getLogger(this.getClass)
  private[this] val V = Vector.tabulate(nAttrs)(i => new IntervalHeapWrapper(nBins, i))
  private[this] val randomReservoir = SamplingUtils.reservoirSample((1 to s).toList.iterator, 1)

  def updateSamples(v: LabeledVector): Vector[IntervalHeapWrapper] = {
    val attrs = v.vector.map(_._2)
    val label = v.label
    // TODO: Check for missing values
    attrs
      .zipWithIndex
      .foreach {
        case (attr, i) =>
          if (V(i).getNbSamples < s) {
            V(i) insertValue attr // insert
          } else {
            if (randomReservoir(0) <= s / (i + 1)) {
              //val randVal = Random nextInt s
              //V(i) replace (randVal, attr)
              V(i) insertValue attr
            }
          }
      }
    V
  }

  /**
   * Return the cutpoints for the discretization
   *
   */
  def cutPoints: Vector[Vector[Double]] = V map (_.getBoundaries.toVector)

  def discretize(data: DataSet[LabeledVector]): (DataSet[Vector[IntervalHeapWrapper]], Vector[Vector[Double]]) = {
    val r = data map (x => updateSamples(x))
    val c = cutPoints

    (r, c)
  }
}

Using flink, I would like to get the cutpoints after the call of discretize, but it seems the information stored in V get loss. Do I have to use Broadcast like in this question? is there a better way to access the state of class?

I've tried to call cutpoints in two ways, one with is:

def discretize(data: DataSet[LabeledVector]) = data map (x => updateSamples(x))

Then, called from outside:

val a = IDADiscretizer(nAttrs = 4)
val r = a.discretize(dataSet)
r.print
val cuts = a.cutPoints

Here, cuts is empty so I tried to compute the discretization as well as the cutpoints inside discretize:

def discretize(data: DataSet[LabeledVector]) = {
    val r = data map (x => updateSamples(x))
    val c = cutPoints

    (r, c)
  }

And use it like this:

val a = IDADiscretizer(nAttrs = 4)
val (d, c) = a.discretize(dataSet)
c foreach println

But the same happends.

Finally, I've also tried to make V completely public:

val V = Vector.tabulate(nAttrs)(i => new IntervalHeapWrapper(nBins, i))

Still empty

What am I doing wrong?

Related questions:

Answer

Thanks to @TillRohrmann what I finally did was:

private[this] def computeCutPoints(x: LabeledVector) = {
    val attrs = x.vector.map(_._2)
    val label = x.label
    attrs
      .zipWithIndex
      .foldLeft(V) {
        case (iv, (v, i)) =>
          iv(i) insertValue v
          iv
      }
  }

  /**
   * Return the cutpoints for the discretization
   *
   */
  def cutPoints(data: DataSet[LabeledVector]): Seq[Seq[Double]] =
    data.map(computeCutPoints _)
      .collect
      .last.map(_.getBoundaries.toVector)

  def discretize(data: DataSet[LabeledVector]): DataSet[LabeledVector] =
data.map(updateSamples _)

And then use it like this:

val a = IDADiscretizer(nAttrs = 4)
val d = a.discretize(dataSet)
val cuts = a.cutPoints(dataSet)
d.print
cuts foreach println

I do not know if it is the best way, but at least is working now.

Alejandro Alcalde
  • 5,990
  • 6
  • 39
  • 79

1 Answers1

1

The way Flink works is that the user defines operators/user defined functions which operate on input data coming from a source function. In order to execute a program the user code is sent to the Flink cluster where it is executed. The results of the computation has to be output to some storage system via a sink function.

Due to this, it is not possible to mix local and distributed computations easily as you are trying with your solution. What discretize does is to define a map operator which transforms the input DataSet data. This operation will be executed once you call ExecutionEnvironment#execute or DataSet#print, for example. Now the user code and the definition for IDADiscretizer is sent to the cluster where they are instantiated. Flink will update the values in an instance of IDADiscretizer which is not the same instance as the one you have on the client.

Till Rohrmann
  • 13,148
  • 1
  • 25
  • 51
  • Thanks for your reply, what is the best approach then? Should I use broadcasts? or try to chain the transformations? – Alejandro Alcalde Jul 03 '18 at 08:22
  • 1
    The thing you should do is to do the `cutPoints` calculation on the cluster and write the result to a sink or retrieve it via `collect`. If you have all the information for `V` available after running `discretize` then you should directly call `cutPoints` in the same function. If you need to aggregate the results of `discretize` then you have to add a grouping operation and then apply `cutPoints` in the grouping function. – Till Rohrmann Jul 03 '18 at 09:55
  • Thank you very much, I'll try it. – Alejandro Alcalde Jul 03 '18 at 10:08
  • By _If you have all the information for V available after running discretize then you should directly call cutPoints in the same function_ you mean this? `val d = data.map(updateSamples(_)); val c = cutPoints` This way I still get an empty result. – Alejandro Alcalde Jul 03 '18 at 10:36
  • 1
    It should happen within the map function on the `DataSet` and then be then result in a `DataSet`. – Till Rohrmann Jul 03 '18 at 10:51