1

I have a DataSet read from a CSV file:

val dataSet = env.readCsvFile[ElecNormNew](
      getClass.getResource("/elecNormNew.arff").getPath,
      pojoFields = Array("date", "day", "period", "nswprice", "nswdemand", "vicprice", "vicdemand", "transfer", "label")

As far as I know, ElecNormNew is a POJO:

// elecNormNew POJO
class ElecNormNew(
  var date: Double,
  var day: Int,
  var period: Double,
  var nswprice: Double,
  var nswdemand: Double,
  var vicprice: Double,
  var vicdemand: Double,
  var transfer: Double,
  var label: String) extends Serializable {

  def this() = {
    this(0, 0, 0, 0, 0, 0, 0, 0, "")
  }
}

I also have a simple class:

case class Discretizer[T](
  data: DataSet[T],
  nBins: Int = 5,
  s: Int = 1000) {

  private[this] val log = LoggerFactory.getLogger("Discretizer")
  private[this] val V = Vector.tabulate(10)(_ => IntervalHeap(nBins, 1, 1, s))

  private[this] def updateSamples(x: T): Vector[IntervalHeap] = {
    log.warn(s"$x")
    V
  }

  def discretize() = {
    data map (x => updateSamples(x))
  }
}

But when I try to use it, from a test for example:

val a = new Discretizer[ElecNormNew](dataSet)
a.discretize

I am getting the following error:

org.apache.flink.api.common.InvalidProgramException: Task not serializable
// ...
[info]     at com.elbauldelprogramador.discretizers.IDADiscretizer.discretize(IDADiscretizer.scala:69)
// ...
[info]     Cause: java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
// ...

I've read this questions and its answers, without luck:

jjmerelo
  • 22,578
  • 8
  • 40
  • 86
Alejandro Alcalde
  • 5,990
  • 6
  • 39
  • 79

1 Answers1

1

I would say the first link you have mentioned provides the answer:

The problem is that you reference the DataSet pages from within a MapFunction. This is not possible, since a DataSet is only the logical representation of a data flow and cannot be accessed at runtime.

discretize uses map, so that applies also here.

jjmerelo
  • 22,578
  • 8
  • 40
  • 86
  • But if I understand correctly, the problem is when you reference the DataSet within the map function, `Pages` in that case, but I am providing a data point to the function, not the `DataSet[T]` itself. – Alejandro Alcalde Jun 06 '18 at 18:09
  • @elbaulp but you still get the same error, so try and use another class to represent data before applying a map to it. – jjmerelo Jun 06 '18 at 18:13