The scenario is the following:
I have a SparkSQL program which performs an ETL process on several Hive tables. These tables have been imported from a Teradata database using Sqoop in RAW TEXT with Snappy compression (unfortunately the Avro format does not work with the Teradata connector). The time required for the Spark SQL process to complete is about 1 hour and 15 minutes.
To improve performance I thought to convert the tables in a more efficient format, like Parquet, before executing the SparkSQL process. According to the documentation and online discussions this should bring a significant boost respect to using raw text (even compressed with snappy, which is not splittable on raw text). Thus I converted all the Hive tables in Parquet format with Snappy compression. I've launched the SparkSQL process on these tables with the same settings (num-executors, driver-memory, executor-memory). The process ended in 1 hour and 20 minutes. This was very surprising for me. I didn't expect a 30X boost like I read in some discussions but of course I was expecting an improvement.
The types of operations performed in the Spark program are for the most part joins and filters (where conditions), as is shown in the following snippet:
val sc = new SparkContext(conf)
val sqc = new HiveContext(sc)
sqc.sql("SET hive.exec.compress.output=true")
sqc.sql("SET parquet.compression=SNAPPY")
var vcliff = sqc.read.table(s"$swamp_db.DBU_vcliff")
var vtktdoc = sqc.read.table(s"$swamp_db.DBU_vtktdoc")
var vasccrmtkt = sqc.read.table(s"$swamp_db.DBU_vasccrmtkt")
val numPartitions = 7 * 16
// caching
vcliff.registerTempTable("vcliff")
vtktdoc.registerTempTable("vtktdoc")
vasccrmtkt.registerTempTable("vasccrmtkt")
ar ORI_TktVCRAgency = sqc.sql(
s"""
| SELECT tic.CODCLI,
| tic.CODARLPFX,
| tic.CODTKTNUM,
| tic.DATDOCISS,
| vloc.CODTHR,
| vloc.NAMCMPNAMTHR,
| vloc.CODAGNCTY,
| vloc.NAMCIT,
| vloc.NAMCOU,
| vloc.CODCOU,
| vloc.CODTYPTHR,
| vloc.CODZIP,
| vcom.CODCOMORGLEVDPC,
| vcom.DESCOMORGLEVDPC,
| vcom.CODCOMORGLEVRMX,
| vcom.DESCOMORGLEVRMX,
| vcom.CODCOMORGLEVSALUNT,
| vcom.CODPSECOMORGCTYLEVSALUNT,
| vcom.DESCOMORGLEVSALUNT,
| vcom.CODCOMORGLEVRPR,
| vcom.CODPSECOMORGCTYLEVRPR,
| vcom.DESCOMORGLEVRPR,
| vcom.CODCOMORGLEVCTYCNL,
| vcom.CODPSECOMORGCTYLEVCTYCNL,
| vcom.DESCOMORGLEVCTYCNL,
| vcom.CODCOMORGLEVUNT,
| vcom.CODPSECOMORGCTYLEVUNT,
| vcom.DESCOMORGLEVUNT,
| vcli.DESCNL
| FROM $swamp_db.DBU_vlocpos vloc
| LEFT JOIN $swamp_db.DBU_vcomorghiemktgeo vcom ON vloc.codtypthr = vcom.codtypthr
| AND vloc.codthr = vcom.codthr
| LEFT JOIN TicketDocCrm tic ON tic.codvdt7 = vloc.codthr
| LEFT JOIN vcliff vc ON vc.codcli = tic.codcli
| LEFT JOIN $swamp_db.DBU_vclieml vcli ON vc.codcli = vcli.codcli
""".stripMargin)
ORI_TktVCRAgency.registerTempTable("ORI_TktVCRAgency")
[...]
var TMP_workTemp = sqc.sql(
s"""
|SELECT *
|FROM TicketDocCrm
| WHERE CODPNRREF != ''
| AND (DESRTGSTS LIKE '%USED%'
| OR DESRTGSTS LIKE '%OK%'
| OR DESRTGSTS LIKE '%CTRL%'
| OR DESRTGSTS LIKE '%RFND%'
| OR DESRTGSTS LIKE '%RPRT%'
| OR DESRTGSTS LIKE '%LFTD%'
| OR DESRTGSTS LIKE '%CKIN%')
""".stripMargin)
TMP_workTemp.registerTempTable("TMP_workTemp")
var TMP_workTemp1 = sqc.sql(
s"""
|SELECT *
|FROM TMP_workTemp w
|INNER JOIN
| (SELECT CODTKTNUM as CODTKTNUM_T
| FROM (
| SELECT CODCLI, CODTKTNUM, COUNT(*) as n
| FROM TMP_workTemp
| GROUP BY CODCLI, CODTKTNUM
| HAVING n > 1)
| a) b
|ON w.CODTKTNUM = b.CODTKTNUM_T
""".stripMargin).drop("CODTKTNUM_T")
[...]
The cluster is composed by 2 masters and 7 workers. Each node has:
- 16 cores cpu
- 110 GB ram
Spark runs on YARN.
Has anyone any insight on why I am not getting any performance improvement switching from raw text to Parquet format before processing data in Spark?