0

I'm trying to get mass elevation data from tiff image, I have a csv file. csv file contents latitude, longitude and other attributes also. Looping through csv file and getting latitude and longitude and calling elevation method, Code given below. Reference RasterFrames extracting location information problem

    package main.scala.sample

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql._
    import org.apache.spark.sql.functions._
    import org.locationtech.rasterframes._
    import org.locationtech.rasterframes.datasource.raster._
    import org.locationtech.rasterframes.encoders.CatalystSerializer._
    import geotrellis.raster._
    import geotrellis.vector.Extent
    import org.locationtech.jts.geom.Point
    import org.apache.spark.sql.functions.col

    object SparkSQLExample {

        def main(args: Array[String]) {

            implicit val spark = SparkSession.builder()
            .master("local[*]").appName("RasterFrames")
            .withKryoSerialization.getOrCreate().withRasterFrames
            spark.sparkContext.setLogLevel("ERROR")


            import spark.implicits._

            val example = "https://raw.githubusercontent.com/locationtech/rasterframes/develop/core/src/test/resources/LC08_B7_Memphis_COG.tiff"
            val rf = spark.read.raster.from(example).load()

            val rf_value_at_point = udf((extentEnc: Row, tile: Tile, point: Point) => {
              val extent = extentEnc.to[Extent]
              Raster(tile, extent).getDoubleValueAtPoint(point)
            })

            val spark_file:SparkSession = SparkSession.builder()
            .master("local[1]")
            .appName("SparkByExamples")
            .getOrCreate()

            spark_file.sparkContext.setLogLevel("ERROR")

            println("spark read csv files from a directory into RDD")
            val rddFromFile = spark_file.sparkContext.textFile("point.csv")
            println(rddFromFile.getClass)

            def customF(str: String): String = {
                val lat = str.split('|')(2).toDouble;
                val long = str.split('|')(3).toDouble;
                val point = st_makePoint(long, lat)
                val test = rf.where(st_intersects(rf_geometry(col("proj_raster")), point))
        .select(rf_value_at_point(rf_extent(col("proj_raster")), rf_tile(col("proj_raster")), point) as "value")
                return test.toString()
            }
            val rdd2=rddFromFile.map(f=> customF(f))
            rdd2.foreach(t=>println(t))
            spark.stop()

      }
    }

when I'm running getting null pointer exception, any help appreciated

java.lang.NullPointerException
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:182)
    at org.apache.spark.sql.Dataset$.apply(Dataset.scala:64)
    at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:3416)
    at org.apache.spark.sql.Dataset.filter(Dataset.scala:1490)
    at org.apache.spark.sql.Dataset.where(Dataset.scala:1518)
    at main.scala.sample.SparkSQLExample$.main$scala$sample$SparkSQLExample$$customF$1(SparkSQLExample.scala:49)
Dev
  • 413
  • 10
  • 27
  • Don't use `return` in Scala. Like ever. It doesn't do what you think it does. When you `map` to `customF` it returns from the entire `map`. Dont think that's the main issue but remove that return and see what happens – sinanspd Jul 14 '21 at 17:33

1 Answers1

0

The function which is being mapped over the RDD (customF) is not null safe. Try calling customF(null) and see what happens. If it throws an exception, then you will have to make sure that rddFromFile doesn't contain any null/missing values.

It is a little hard to tell if that is exactly where issue is. I think the stack trace of the exception is less helpful than usual because the function is being run in a spark tasks on the workers.

If that is the issue, you could rewrite customF to handle the case where str is null or change the parameter type to Option[String] (and tweak the logic accordingly).

By the way, the same thing allies for UDFs. They need to either

  1. Accept Option types as input
  2. Handle the case where each arg is null or
  3. Only be applied to data with no missing values.
Erp12
  • 600
  • 3
  • 16