I know about How to store custom objects in Dataset? but still, it is not really clear for me how to build this custom encoder which properly serializes to multiple fields. Manually, I created some functions https://github.com/geoHeil/geoSparkScalaSample/blob/master/src/main/scala/myOrg/GeoSpark.scala#L122-L154 wich map a Polygon back and forth between Dataset - RDD - Dataset
by mapping the objects to primitive types spark can handle i.e. a tuple (String, Int)
(edit: full code below).
For example, to go from the Polygon Object to a tuple of (String, Int)
I use the following
def writeSerializableWKT(iterator: Iterator[AnyRef]): Iterator[(String, Int)] = {
val writer = new WKTWriter()
iterator.flatMap(cur => {
val cPoly = cur.asInstanceOf[Polygon]
// TODO is it efficient to create this collection? Is this a proper iterator 2 iterator transformation?
List(((writer.write(cPoly), cPoly.getUserData.asInstanceOf[Int])).iterator
})
}
def createSpatialRDDFromLinestringDataSet(geoDataset: Dataset[WKTGeometryWithPayload]): RDD[Polygon] = {
geoDataset.rdd.mapPartitions(iterator => {
val reader = new WKTReader()
iterator.flatMap(cur => {
try {
reader.read(cur.lineString) match {
case p: Polygon => {
val polygon = p.asInstanceOf[Polygon]
polygon.setUserData(cur.payload)
List(polygon).iterator
}
case _ => throw new NotImplementedError("Multipolygon or others not supported")
}
} catch {
case e: ParseException =>
logger.error("Could not parse")
logger.error(e.getCause)
logger.error(e.getMessage)
None
}
})
})
}
I noticed that already I start to do a lot of work twice (see the link to both methods). Now wanting to be able to handle
https://github.com/geoHeil/geoSparkScalaSample/blob/master/src/main/scala (full code below)
/myOrg/GeoSpark.scala#L82-L84
val joinResult = JoinQuery.SpatialJoinQuery(objectRDD, minimalPolygonCustom, true)
// joinResult.map()
val joinResultCounted = JoinQuery.SpatialJoinQueryCountByKey(objectRDD, minimalPolygonCustom, true)
which is a PairRDD[Polygon, HashSet[Polygon]]
, or respectively PairRDD[Polygon, Int]
how would I need to specify my functions as an Encoder in order to not solve the same problem 2 more times?