1

I know about How to store custom objects in Dataset? but still, it is not really clear for me how to build this custom encoder which properly serializes to multiple fields. Manually, I created some functions https://github.com/geoHeil/geoSparkScalaSample/blob/master/src/main/scala/myOrg/GeoSpark.scala#L122-L154 wich map a Polygon back and forth between Dataset - RDD - Dataset by mapping the objects to primitive types spark can handle i.e. a tuple (String, Int)(edit: full code below).

For example, to go from the Polygon Object to a tuple of (String, Int) I use the following

def writeSerializableWKT(iterator: Iterator[AnyRef]): Iterator[(String, Int)] = {
    val writer = new WKTWriter()
    iterator.flatMap(cur => {
      val cPoly = cur.asInstanceOf[Polygon]
      // TODO is it efficient to create this collection? Is this a proper iterator 2 iterator transformation?
      List(((writer.write(cPoly), cPoly.getUserData.asInstanceOf[Int])).iterator
    })
  }
 def createSpatialRDDFromLinestringDataSet(geoDataset: Dataset[WKTGeometryWithPayload]): RDD[Polygon] = {
    geoDataset.rdd.mapPartitions(iterator => {
      val reader = new WKTReader()
      iterator.flatMap(cur => {
        try {
          reader.read(cur.lineString) match {
            case p: Polygon => {
              val polygon = p.asInstanceOf[Polygon]
              polygon.setUserData(cur.payload)
              List(polygon).iterator
            }
            case _ => throw new NotImplementedError("Multipolygon or others not supported")
          }
        } catch {
          case e: ParseException =>
            logger.error("Could not parse")
            logger.error(e.getCause)
            logger.error(e.getMessage)
            None
        }
      })
    })
  }

I noticed that already I start to do a lot of work twice (see the link to both methods). Now wanting to be able to handle

https://github.com/geoHeil/geoSparkScalaSample/blob/master/src/main/scala (full code below)

/myOrg/GeoSpark.scala#L82-L84
 val joinResult = JoinQuery.SpatialJoinQuery(objectRDD, minimalPolygonCustom, true)
  //  joinResult.map()
  val joinResultCounted = JoinQuery.SpatialJoinQueryCountByKey(objectRDD, minimalPolygonCustom, true)

which is a PairRDD[Polygon, HashSet[Polygon]], or respectively PairRDD[Polygon, Int] how would I need to specify my functions as an Encoder in order to not solve the same problem 2 more times?

Community
  • 1
  • 1
Georg Heiler
  • 16,916
  • 36
  • 162
  • 292
  • Please don't post relevant code outside Stack Overflow. Links you provide are likely to become invalid (especially without hashes) rendering this useless for the future readers. – zero323 Mar 14 '17 at 13:16
  • @zero323 I edited the code snippets to contain the full code. Thought it would be more readable. Please have a look at this edited version. For a better reference of the full context I would want to keep the links. – Georg Heiler Mar 14 '17 at 13:20
  • Thanks and please follow the guidelines in the future. Questions which depend on the external resources are as useless as link only answers and _and the shortest code necessary to reproduce it in the question itself_ is the official requirement. "Questions" like http://stackoverflow.com/q/42689512/1560062 to anyone but you. BTW I'd really recommend "permanent" links(with commit hash) if you share code in general. Regarding your question - great answer by [alec](https://stackoverflow.com/users/3072788) you've already linked here is the only sensible option available out there right now. – zero323 Mar 14 '17 at 13:34
  • What is unclear to me is how to register an Encoder based on my 2 methods which provide the serialization to a tuple of (string, int). – Georg Heiler Mar 14 '17 at 13:37
  • `(String, Int)` can use default `Product` `Encoder` and it is not clear what is the purpose of this code and why you need all these conversions in the first place. Parse once, serialize, and move on :) – zero323 Mar 14 '17 at 13:47
  • Polygon doesn't support product. I need to go from hive to spark rdd as only there some suitable indices exist. Then the joined result needs to be stored in hive. – Georg Heiler Mar 14 '17 at 14:02
  • So use generic (Kryo or Java) encoder. These conversions don't make much sense. – zero323 Mar 14 '17 at 14:32
  • Maybe you are right. But will Hive be able to handle that when I persist this binary df as a hive table? – Georg Heiler Mar 14 '17 at 16:10
  • Kryo / Java enc output is just a byte array. It won't have any special meaning for Hive and is a valid type. – zero323 Mar 14 '17 at 16:39

0 Answers0