I'm building a tail-recursive function that reads multiple hdfs paths and merges all of them into a single data-frame. The function works perfectly as long as all the path exist, if not, the function fails and does not finish joining the data of the paths that do exist. To solve this problem I have tried to handle the error using try/catch but have not been successful.
The error says: could not optimize @tailrec annotated method loop: it contains a recursive call not in tail position
My function is :
def getRangeData(toOdate: String, numMonths: Int, pathRoot: String, ColumnsTable: List[String]): DataFrame = {
val dataFrameNull = spark.createDataFrame(spark.sparkContext.emptyRDD[Row],
StructType((ColumnsTable :+ "odate").map(columnName => StructField(columnName, StringType, true))))
val rangePeriod = getRangeDate(numMonths, toOdate)
@tailrec
def unionRangeData(rangePeriod: List[LocalDate], pathRoot: String, df: DataFrame = dataFrameNull): DataFrame = {
try {
if (rangePeriod.isEmpty) {
df
}
else {
val month = "%02d".format(rangePeriod.head.getMonthValue)
val year = rangePeriod.head.getYear
val odate = rangePeriod.head.toString
val path = s"${pathRoot}/partition_data_year_id=${year}/partition_data_month_id=${month}"
val columns = ColumnsTable.map(columnName => trim(col(columnName)).as(columnName))
val dfTemporal = spark.read.parquet(path).select(columns: _*).withColumn("odate", lit(odate).cast("date"))
unionRangeData(rangePeriod.tail, pathRoot, df.union(dfTemporal))
}
} catch {
case e: Exception =>
logger.error("path not exist")
dataFrameNull
}
}
unionRangeData(rangePeriod, pathRoot)
}
def getRangeDate(numMonths: Int, toOdate: String, listDate: List[LocalDate] = List()): List[LocalDate] = {
if (numMonths == 0) {
listDate
}
else {
getRangeDate(numMonths - 1, toOdate, LocalDate.parse(toOdate).plusMonths(1).minusMonths(numMonths) :: listDate)
}
}
In advance, thank you very much for your help.