1

The scenario is the following:

I have a SparkSQL program which performs an ETL process on several Hive tables. These tables have been imported from a Teradata database using Sqoop in RAW TEXT with Snappy compression (unfortunately the Avro format does not work with the Teradata connector). The time required for the Spark SQL process to complete is about 1 hour and 15 minutes.

To improve performance I thought to convert the tables in a more efficient format, like Parquet, before executing the SparkSQL process. According to the documentation and online discussions this should bring a significant boost respect to using raw text (even compressed with snappy, which is not splittable on raw text). Thus I converted all the Hive tables in Parquet format with Snappy compression. I've launched the SparkSQL process on these tables with the same settings (num-executors, driver-memory, executor-memory). The process ended in 1 hour and 20 minutes. This was very surprising for me. I didn't expect a 30X boost like I read in some discussions but of course I was expecting an improvement.

The types of operations performed in the Spark program are for the most part joins and filters (where conditions), as is shown in the following snippet:

val sc = new SparkContext(conf)
val sqc = new HiveContext(sc)
sqc.sql("SET hive.exec.compress.output=true")
sqc.sql("SET parquet.compression=SNAPPY")


var vcliff = sqc.read.table(s"$swamp_db.DBU_vcliff")
var vtktdoc = sqc.read.table(s"$swamp_db.DBU_vtktdoc")
var vasccrmtkt = sqc.read.table(s"$swamp_db.DBU_vasccrmtkt")

val numPartitions = 7 * 16

// caching
vcliff.registerTempTable("vcliff")
vtktdoc.registerTempTable("vtktdoc")
vasccrmtkt.registerTempTable("vasccrmtkt")


ar ORI_TktVCRAgency = sqc.sql(
    s"""
       |            SELECT tic.CODCLI,
       |            tic.CODARLPFX,
       |            tic.CODTKTNUM,
       |            tic.DATDOCISS,
       |            vloc.CODTHR,
       |            vloc.NAMCMPNAMTHR,
       |            vloc.CODAGNCTY,
       |            vloc.NAMCIT,
       |            vloc.NAMCOU,
       |            vloc.CODCOU,
       |            vloc.CODTYPTHR,
       |            vloc.CODZIP,
       |            vcom.CODCOMORGLEVDPC,
       |            vcom.DESCOMORGLEVDPC,
       |            vcom.CODCOMORGLEVRMX,
       |            vcom.DESCOMORGLEVRMX,
       |            vcom.CODCOMORGLEVSALUNT,
       |            vcom.CODPSECOMORGCTYLEVSALUNT,
       |            vcom.DESCOMORGLEVSALUNT,
       |            vcom.CODCOMORGLEVRPR,
       |            vcom.CODPSECOMORGCTYLEVRPR,
       |            vcom.DESCOMORGLEVRPR,
       |            vcom.CODCOMORGLEVCTYCNL,
       |            vcom.CODPSECOMORGCTYLEVCTYCNL,
       |            vcom.DESCOMORGLEVCTYCNL,
       |            vcom.CODCOMORGLEVUNT,
       |            vcom.CODPSECOMORGCTYLEVUNT,
       |            vcom.DESCOMORGLEVUNT,
       |            vcli.DESCNL
       |            FROM $swamp_db.DBU_vlocpos vloc
       |                LEFT JOIN $swamp_db.DBU_vcomorghiemktgeo vcom ON vloc.codtypthr = vcom.codtypthr
       |            AND vloc.codthr = vcom.codthr
       |            LEFT JOIN TicketDocCrm tic ON tic.codvdt7 = vloc.codthr
       |            LEFT JOIN vcliff vc ON vc.codcli = tic.codcli
       |            LEFT JOIN $swamp_db.DBU_vclieml vcli ON vc.codcli = vcli.codcli
     """.stripMargin)

ORI_TktVCRAgency.registerTempTable("ORI_TktVCRAgency")

[...]

var TMP_workTemp = sqc.sql(
    s"""
       |SELECT *
       |FROM TicketDocCrm
       |            WHERE CODPNRREF != ''
       |            AND (DESRTGSTS LIKE '%USED%'
       |            OR DESRTGSTS LIKE '%OK%'
       |            OR DESRTGSTS LIKE '%CTRL%'
       |            OR DESRTGSTS LIKE '%RFND%'
       |            OR DESRTGSTS LIKE '%RPRT%'
       |            OR DESRTGSTS LIKE '%LFTD%'
       |            OR DESRTGSTS LIKE '%CKIN%')
     """.stripMargin)

TMP_workTemp.registerTempTable("TMP_workTemp")

var TMP_workTemp1 = sqc.sql(
    s"""
       |SELECT *
       |FROM TMP_workTemp w
       |INNER JOIN
       |    (SELECT CODTKTNUM as CODTKTNUM_T
       |    FROM (
       |        SELECT CODCLI, CODTKTNUM, COUNT(*) as n
       |        FROM TMP_workTemp
       |        GROUP BY CODCLI, CODTKTNUM
       |        HAVING n > 1)
       |    a) b
       |ON w.CODTKTNUM = b.CODTKTNUM_T
     """.stripMargin).drop("CODTKTNUM_T")

[...]

The cluster is composed by 2 masters and 7 workers. Each node has:

  • 16 cores cpu
  • 110 GB ram

Spark runs on YARN.

Has anyone any insight on why I am not getting any performance improvement switching from raw text to Parquet format before processing data in Spark?

revy
  • 3,945
  • 7
  • 40
  • 85

2 Answers2

3

Short answer.

It is not true that parquet will outperform raw-text data for all types of queries.

TLDR;

Parquet is a columnar store(to describe what's a columnar store think of every column in the table being stored in a separate file, instead of files storing rows) and this pattern(columnar stores) improves performance for analytical workloads(OLAP).

I can give you an example of why storing data in columnar fashion(like parquet) might improve the query performance significantly. Assume you have a table with 300 columns and you want to run the following query.

SELECT avg(amount)
FROM my_big_table 

In the above query, you are only concerned about the average value of the column amount.

If spark had to execute this on a raw-text first it will use the schema you have provided to split the lines and then parses the amount column, this takes considerable compute time to parse the amount column from the 300 odd columns from the my_big_table.

And if spark had to get average of amount from a parquet store, it has to read only the parquet blocks of amount-column-data (remember every column of the table is stored separately in parquet). Parquet might further improve the performance by storing a lot of metadata and using column level compression.

You should read this so post.

Now back to your question, most of your queries are running SELECT * meaning you are reading all of the data into spark and then either joining or filtering some values. In the second query, your query won't have much performance boost using a parquet table as you are reading all of the columns and parquet will be a costlier choice here as you potentially end up reading more files that you might have done in raw-text.

Filtering is faster in parquet in few cases, but not always, depends on your data.

To summarize you should choose a datastore based on the type of queries that you are going to run and the kind of data you have.

Sudev Ambadi
  • 655
  • 6
  • 12
  • Yes, what I was thinking about. Probably I will get better performance with no-columnar binary format like Avro or Sequencefile, but I have to consider the time for the conversion. Thanks for the good points. – revy Jul 29 '18 at 10:39
  • Data storage is one thing, you should also look at partitioning and bucketing your datasets so that you avoid shuffles. If you are satisfied with the answer please accept the answer. – Sudev Ambadi Jul 30 '18 at 06:30
1

Couple of points observed:

  • hive.exec.compress.output=true -- This will ensure the final output of the Hive query will be compressed. But in this case, you are reading the data from Hive using Spark, so this will not have any impact on the performance.
  • Check the partitions of the dataframes, ensure there are enough dataframe partitions, so that executors will process the data in parallel.

To check the partitions:

vcliff.rdd.getNumPartitions
  • Partition the dataframes by the most used columns of Dataframes, so that Spark will avoid shuffle while performing the aggregations like joins. If there are more distinct values for the most frequently used column, instead of partitioning, you can use Bucketing on that column, so that Spark will distribute the data evenly across partitions as opposed to skewed into just one or two.

    vcliff.repartition([numPartitions], "codcli")

    TicketDocCrm.repartition([numPartitions], "DESRTGSTS")

Lakshman Battini
  • 1,842
  • 11
  • 25