1

I'm building a tail-recursive function that reads multiple hdfs paths and merges all of them into a single data-frame. The function works perfectly as long as all the path exist, if not, the function fails and does not finish joining the data of the paths that do exist. To solve this problem I have tried to handle the error using try/catch but have not been successful.

The error says: could not optimize @tailrec annotated method loop: it contains a recursive call not in tail position

My function is :

def getRangeData(toOdate: String, numMonths: Int, pathRoot: String, ColumnsTable: List[String]): DataFrame = {

    val dataFrameNull = spark.createDataFrame(spark.sparkContext.emptyRDD[Row],
      StructType((ColumnsTable :+ "odate").map(columnName => StructField(columnName, StringType, true))))

    val rangePeriod = getRangeDate(numMonths, toOdate)

    @tailrec
    def unionRangeData(rangePeriod: List[LocalDate], pathRoot: String, df: DataFrame = dataFrameNull): DataFrame = {
      try {
        if (rangePeriod.isEmpty) {
          df
        }
        else {
          val month = "%02d".format(rangePeriod.head.getMonthValue)
          val year = rangePeriod.head.getYear
          val odate = rangePeriod.head.toString

          val path = s"${pathRoot}/partition_data_year_id=${year}/partition_data_month_id=${month}"
          val columns = ColumnsTable.map(columnName => trim(col(columnName)).as(columnName))
          val dfTemporal = spark.read.parquet(path).select(columns: _*).withColumn("odate", lit(odate).cast("date"))

          unionRangeData(rangePeriod.tail, pathRoot, df.union(dfTemporal))
        }
      } catch {
        case e: Exception =>
          logger.error("path not exist")
          dataFrameNull
      }
    }

    unionRangeData(rangePeriod, pathRoot)
  }

  def getRangeDate(numMonths: Int, toOdate: String, listDate: List[LocalDate] = List()): List[LocalDate] = {
    if (numMonths == 0) {
      listDate
    }
    else {
      getRangeDate(numMonths - 1, toOdate, LocalDate.parse(toOdate).plusMonths(1).minusMonths(numMonths) :: listDate)
    }
  }

In advance, thank you very much for your help.

Beto Javi
  • 67
  • 4
  • 2
    I don't really see much point in worrying about tail-recusrion when dealing with Spark Datasets. It feels more like worrying about tyre pressure for a train. – sarveshseri Sep 29 '20 at 00:07
  • I don't understand your point, explain yourself – Beto Javi Sep 29 '20 at 15:10
  • 1
    tail recursion is a technique for avoiding unnacessary stack build-up with recusrive function calls. Most spark operations (transform) result in an AST representing the computation graph which is just a Data Strcuture. This computation graph is then optimised and computed only when a consumption operation is performed. Also this computation happens in a distributed environment. Meaning that the whole concept of tail-recursion is not really (should not) relevant to Spark application. – sarveshseri Sep 30 '20 at 07:04
  • 1
    Generally speaking (there can be exceptions in some cases), if you ever need to worry about tail-recursion in a Spark application then you are doing things really wrong way similar to putting up tyres in a train and then worrying about tyre pressure. – sarveshseri Sep 30 '20 at 07:05

1 Answers1

2

I would suggest you remove the try-catch construct entirely from the function and use it instead at the call site at the bottom of getRangeData.

Alternatively you can also use scala.util.Try to wrap the call: Try(unionRangeData(rangePeriod, pathRoot)), and use one of its combinators to perform your logging or provide a default value in the error case.

Related post which explains why the Scala compiler cannot perform tail call optimization inside try-catch: Why won't Scala optimize tail call with try/catch?