Tl;DR: using only spark sql built-in functions can significantly speedup computation
As explained in this answer, spark sql native functions are more
performant than user-defined functions. So we can try to implement the solution to your problem using only
spark sql native functions.
I show two main versions of implementation. One using all the sql functions existing in last version
of Spark available at the time I wrote this answer, which is Spark 3.0. And another using only sql functions
existing in spark version when the question was asked, so functions existing in Spark 2.3. All the used functions
in this version are also available in Spark 2.2
Spark 3.0 implementation with sql functions
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{MapType, StringType}
val mapFromPrefixedColumns = map_filter(
map(raw.columns.filter(_.startsWith("columnPrefix/")).flatMap(c => Seq(lit(c.dropWhile(_ != '/').tail), col(c))): _*),
(_, v) => v.isNotNull
)
val mapFromOriginalMap = when(col("originalMap").isNotNull && col("originalMap").notEqual(""),
from_json(col("originalMap"), MapType(StringType, StringType))
).otherwise(
map()
)
val comprehensiveMapExpr = map_concat(mapFromPrefixedColumns, mapFromOriginalMap)
raw.withColumn("allMap", comprehensiveMapExpr)
Spark 2.2 implementation with sql functions
In spark 2.2, We don't have the functions map_concat
(available in spark 2.4) and map_filter
(available in spark 3.0).
I replace them with user-defined functions:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{MapType, StringType}
def filterNull(map: Map[String, String]): Map[String, String] = map.toSeq.filter(_._2 != null).toMap
val filter_null_udf = udf(filterNull _)
def mapConcat(map1: Map[String, String], map2: Map[String, String]): Map[String, String] = map1 ++ map2
val map_concat_udf = udf(mapConcat _)
val mapFromPrefixedColumns = filter_null_udf(
map(raw.columns.filter(_.startsWith("columnPrefix/")).flatMap(c => Seq(lit(c.dropWhile(_ != '/').tail), col(c))): _*)
)
val mapFromOriginalMap = when(col("originalMap").isNotNull && col("originalMap").notEqual(""),
from_json(col("originalMap"), MapType(StringType, StringType))
).otherwise(
map()
)
val comprehensiveMapExpr = map_concat_udf(mapFromPrefixedColumns, mapFromOriginalMap)
raw.withColumn("allMap", comprehensiveMapExpr)
Implementation with sql functions without json mapping
The last part of the question contains a simplified code without mapping of the json column and without filtering of
null values in result map. I created the following implementation for this specific case. As I don't use functions
that were added between spark 2.2 and spark 3.0, I don't need two versions of this implementation:
import org.apache.spark.sql.functions._
val mapFromPrefixedColumns = map(raw.columns.filter(_.startsWith("columnPrefix/")).flatMap(c => Seq(lit(c), col(c))): _*)
raw.withColumn("allMap", mapFromPrefixedColumns)
Run
For the following dataframe as input:
+--------------------+--------------------+--------------------+----------------+
|columnPrefix/column1|columnPrefix/column2|columnPrefix/column3|originalMap |
+--------------------+--------------------+--------------------+----------------+
|a |1 |x |{"column4": "k"}|
|b |null |null |null |
|c |null |null |{} |
|null |null |null |null |
|d |2 |null | |
+--------------------+--------------------+--------------------+----------------+
I obtain the following allMap
column:
+--------------------------------------------------------+
|allMap |
+--------------------------------------------------------+
|[column1 -> a, column2 -> 1, column3 -> x, column4 -> k]|
|[column1 -> b] |
|[column1 -> c] |
|[] |
|[column1 -> d, column2 -> 2] |
+--------------------------------------------------------+
And for the mapping without json column:
+---------------------------------------------------------------------------------+
|allMap |
+---------------------------------------------------------------------------------+
|[columnPrefix/column1 -> a, columnPrefix/column2 -> 1, columnPrefix/column3 -> x]|
|[columnPrefix/column1 -> b, columnPrefix/column2 ->, columnPrefix/column3 ->] |
|[columnPrefix/column1 -> c, columnPrefix/column2 ->, columnPrefix/column3 ->] |
|[columnPrefix/column1 ->, columnPrefix/column2 ->, columnPrefix/column3 ->] |
|[columnPrefix/column1 -> d, columnPrefix/column2 -> 2, columnPrefix/column3 ->] |
+---------------------------------------------------------------------------------+
Benchmark
I generated a csv file of 10 millions lines, uncompressed (about 800 Mo), containing one column without column prefix,
nine columns with column prefix, and one colonne containing json as a string:
+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+
|id |columnPrefix/column1|columnPrefix/column2|columnPrefix/column3|columnPrefix/column4|columnPrefix/column5|columnPrefix/column6|columnPrefix/column7|columnPrefix/column8|columnPrefix/column9|originalMap |
+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+
|1 |iwajedhor |ijoefzi |der |ob |galsu |ril |le |zaahuz |fuzi |{"column10":"true"}|
|2 |ofo |davfiwir |lebfim |roapej |lus |roum |te |javes |karutare |{"column10":"true"}|
|3 |jais |epciel |uv |piubnak |saajo |doke |ber |pi |igzici |{"column10":"true"}|
|4 |agami |zuhepuk |er |pizfe |lafudbo |zan |hoho |terbauv |ma |{"column10":"true"}|
...
The benchmark is to read this csv file, create the column allMap
, and write this column to parquet. I ran this on my local machine and I obtained the following results
+--------------------------+--------------------+-------------------------+-------------------------+
| implementations | current (with udf) | sql functions spark 3.0 | sql functions spark 2.2 |
+--------------------------+--------------------+-------------------------+-------------------------+
| execution time | 138 seconds | 48 seconds | 82 seconds |
| improvement from current | 0 % faster | 64 % faster | 40 % faster |
+--------------------------+--------------------+-------------------------+-------------------------+
I also ran against the second implementation in the question, that drop the mapping of the json column and the filtering of null value in map.
+--------------------------+-----------------------+------------------------------------+
| implementations | current (with struct) | sql functions without json mapping |
+--------------------------+-----------------------+------------------------------------+
| execution time | 46 seconds | 35 seconds |
| improvement from current | 0 % | 23 % faster |
+--------------------------+-----------------------+------------------------------------+
Of course, the benchmark is rather basic, but we can see an improvement compared to the implementations that use user-defined functions
Conclusion
When you have a performance issue and you use user-defined functions, it can be a good idea to try to replace those user-defined functions by
spark sql functions