0

I have a spark program which involves join operations on big Hive tables (millions of rows with hundreds of columns). The memory used during these joins is really high. I would like to understand the best way to handle this scenario in Spark on YARN in a way that the job will complete succesfully without memory errors. The cluster is composed by 7 workers with 110 GB of ram and 16 cores each. Consider the following scala code:

object Model1Prep {

    def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setAppName("Modello1_Spark")
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        conf.set("spark.io.compression.codec", "org.apache.spark.io.LZFCompressionCodec")
        val sc = new SparkContext(conf)
        val hc = new HiveContext(sc)
        import hc.implicits._


        hc.sql("SET hive.exec.compress.output=true")
        hc.sql("SET parquet.compression=SNAPPY")
        hc.sql("SET spark.sql.parquet.compression.codec=snappy")


        // loading tables on dataframes
        var tableA = hc.read.table("TA")
        var tableB = hc.read.table("TB")
        var tableC = hc.read.table("TC")
        var tableD = hc.read.table("TD")


        // registering tables
        tableA.registerTempTable("TA")
        tableB.registerTempTable("TB")
        tableC.registerTempTable("TC")
        tableD.registerTempTable("TD")


        var join1 = hc.sql("""
            SELECT 
                [many fields]
            FROM TA a 
            JOIN TB b ON a.field = b.field      
            LEFT JOIN TC c ON a.field = c.field         
            WHERE [conditions]
        """)


        var join2 = hc.sql("""
            SELECT 
                [many fields]
            FROM TA a 
            LEFT JOIN TD d ON a.field = d.field
            WHERE [conditions]
        """)


        // [other operations]


        sc.close()
    }
}

Considering that the join operations are really expensive on memory, what are my best options here? I know that a dataframe can be persisted both on memory and on disk, possibly using serialization to be more compact in memory at cost of slower processing time for deserialization (more on here and here). From the code above, the table TA is used in both joins so it makes sense to persist it:

    //[...]        

    // persisting
    tableA.persist(StorageLevel.MEMORY_AND_DISK_SER_2)

    // registering tables
    tableA.registerTempTable("TA")
    tableB.registerTempTable("TB")
    tableC.registerTempTable("TC")
    tableD.registerTempTable("TD")

    //[...]

Should I also persist the other tables in the same way? Or are there other things to make this code run and complete smoothly?

revy
  • 3,945
  • 7
  • 40
  • 85

1 Answers1

0

If you know which field you're joining on and it's always the same field then, as this SO answer suggests, use the same partitioner for the joined tables.