I've read this post here: https://spark.apache.org/docs/latest/programming-guide.html (see Passing Functions to Spark), but my use case is using typed Dataset with my case classes. I'm trying to use the singleton object to hold the mapping method. I am wondering how best to package the functionalities I need to optimize performance for my stage (transform dataset from one type to another, and writes to parquet).
Currently, the stage step is taking an incredibly long time for about 3 million rows (~1.5 hours), around 880 MB of data output into parquet in s3.
I am running in cluster mode using min executors = 3, max executors = 10, 4 cores on each executor, driver memory 8gb.
--
High level coding part:
I am mapping one case class C1 to another case class C2. C1 and C2 have about 16 fields, of various types like java.sql.Timestamp, Option[String] Option[Int], String, Int, BigInt.
case class C1(field1 : _, field2 : _, field3 : _, ...)
case class C2(field1 : _, field2 : _, field3 : _, ...)
In order to map from C1 to C2, I need the functionality (static methods) of a very large java class J that I copied over from https://github.com/drtimcooper/LatLongToTimezone.
public class J {
public static String getValue((float) v) = ...
}
I have written the mapping function inside a util class, Util, which has many other useful functions called by the mapping function.
=========
Basically my code flow looks like this:
case class C1(field1 : _, field2 : _, field3 : _, ...)
case class C2(field1 : _, field2 : _, field3 : _, ...)
// very large java class J that only contains static methods
public class J {
public static String getValue((float) v) = ...
...
}
object Util {
def m1(i: Int): Int = ...
def m2(l: Option[BigDecimal], l2: Option[BigDecimal]): Int = {
J.getValue(l.get, l2.get)
}
...
def convert_C1_to_C2(c1: C1): C2 = {
C2(
field1 = m1(c1.field1),
field2 = m2(c1.field2, c1.field3),
...
}
}
dataframe.as[C1].map(Util.convert_C1_to_C2)
.mode(SaveMode.Overwrite)
.parquet("s3a://s3Path")
Is there a more optimal way to write this? Or can anyone point out any glaring errors in the way I have done this? Looking at my code, I'm not sure why it is taking so darn long to finish the task.
I've already tried coalescing to say, 16 partitions to decrease the amount of files in s3, but this seems to make the job run much slower. Typically there would be 64 partitions without any coalescing.