1

I want to ingest many small text files via spark to parquet. Currently, I use wholeTextFiles and perform some parsing additionally.

To be more precise - these small text files are ESRi ASCII Grid files each with a maximum size of around 400kb. GeoTools are used to parse them as outlined below.

Do you see any optimization possibilities? Maybe something to avoid the creation of unnecessary objects? Or something to better handle the small files. I wonder if it is better to only get the paths of the files and manually read them instead of using String -> ByteArrayInputStream.

case class RawRecords(path: String, content: String)
case class GeometryId(idPath: String, value: Double, geo: String)
@transient lazy val extractor = new PolygonExtractionProcess()
@transient lazy val writer = new WKTWriter()

def readRawFiles(path: String, parallelism: Int, spark: SparkSession) = {
    import spark.implicits._
    spark.sparkContext
      .wholeTextFiles(path, parallelism)
      .toDF("path", "content")
      .as[RawRecords]
      .mapPartitions(mapToSimpleTypes)
  }

def mapToSimpleTypes(iterator: Iterator[RawRecords]): Iterator[GeometryId] = iterator.flatMap(r => {
    val extractor = new PolygonExtractionProcess()

    // http://docs.geotools.org/latest/userguide/library/coverage/arcgrid.html
    val readRaster = new ArcGridReader(new ByteArrayInputStream(r.content.getBytes(StandardCharsets.UTF_8))).read(null)

    // TODO maybe consider optimization of known size instead of using growable data structure
    val vectorizedFeatures = extractor.execute(readRaster, 0, true, null, null, null, null).features
    val result: collection.Seq[GeometryId] with Growable[GeometryId] = mutable.Buffer[GeometryId]()

    while (vectorizedFeatures.hasNext) {
      val vectorizedFeature = vectorizedFeatures.next()
      val geomWKTLineString = vectorizedFeature.getDefaultGeometry match {
        case g: Geometry => writer.write(g)
      }
      val geomUserdata = vectorizedFeature.getAttribute(1).asInstanceOf[Double]
      result += GeometryId(r.path, geomUserdata, geomWKTLineString)
    }
    result
  })
Georg Heiler
  • 16,916
  • 36
  • 162
  • 292
  • Please see my answer here if it helps, but again thats with dataframe: https://stackoverflow.com/a/45227410/297113 – Jai Prakash Jul 21 '17 at 01:09

1 Answers1

2

I have suggestions:

  1. use wholeTextFile -> mapPartitions -> convert to Dataset. Why? If you make mapPartitions on Dataset, then all rows are converted from internal format to object - it causes additional serialization.
  2. Run Java Mission Control and sample your application. It will show all compilations and times of execution of methods
  3. Maybe you can use binaryFiles, it will give you Stream, so you can parse it without additional reading in mapPartitions
T. Gawęda
  • 15,706
  • 4
  • 46
  • 61
  • Shouldn't tungstens compression make up for all the object problems? Is there something like `wholeBinaryFiles` which will give me the path and the whole file? – Georg Heiler May 21 '17 at 20:56
  • @GeorgHeiler You can use `SparkContext.binaryFiles`, it returns pair FilePath, DataStream – T. Gawęda May 21 '17 at 21:11
  • What type of stream is the `PortableDataStream`? When using ` new ArcGridReader(r._2).read(null)` I get an `ERROR arcgrid: Unsupported input type `. Any regular input stream should be supported by https://github.com/geotools/geotools/blob/master/modules/plugin/arcgrid/src/main/java/org/geotools/gce/arcgrid/ArcGridReader.java#L119 – Georg Heiler May 22 '17 at 05:59
  • Edit: it works fine, I was missing a call to `open` on the `PortableDataStream `. I will do some profiling now. – Georg Heiler May 22 '17 at 07:02
  • I ultimately want to have a data frame to dump to parquet. https://stackoverflow.com/questions/29686573/spark-obtaining-file-name-in-rdds/36356253#36356253 is using `spark.read.text`. Do you think using this instead would be better? But that again would result in many `String` objects. – Georg Heiler May 23 '17 at 05:02