0

Greeting,

I have created a Spark 2.1.1 cluster in Amazon EC2 with instance type m4.large of 1 master and 5 slaves to start. My PostgreSQL 9.5 database (t2.large) has a table of over 2 billions rows and 7 column that I would like to process. I have followed the direction from Apache Spark website and other various sources on how to connect and process these data.

My problem is that Spark SQL performance is way slower than my database. My sql statement (see below in the code) takes about 21mins in PSQL, but Spark SQL take about 42 min to finish. My main goal is to measure the performance of PSQL vs Spark SQL and so far I am not getting the desire results. I would appreciate the help.

Thank you

I have tried increasing fetchSize from 10000 to 100000, caching the dataframe, increase numpartition to 100, set spark.sql.shuffle to 2000, double my cluster size, and use larger instance type and so far I have not seen any improvements.

val spark = SparkSession.builder()
                        .appName("Spark SQL")
                        .getOrCreate();
val jdbcDF = spark.read.format("jdbc")
                  .option("url", DBI_URL)
                  .option("driver", "org.postgresql.Driver")
                  .option("dbtable", "ghcn_all")
                  .option("fetchsize", 10000)
                  .load()
                  .createOrReplaceTempView("ghcn_all");

val sqlStatement = "SELECT ghcn_date, element_value/10.0 
FROM ghcn_all 
WHERE station_id = 'USW00094846' 
      AND (ghcn_date >= '2015-01-01' AND ghcn_date <= '2015-12-31') 
      AND qflag IS NULL 
      AND element_type = 'PRCP' 
ORDER BY ghcn_date";

val sqlDF = spark.sql(sqlStatement);

var start:Long = System.nanoTime;
val num_rows:Long = sqlDF.count();
var end:Long = System.nanoTime;
println("Total Row                : " + num_rows);
println("Total Collect Time Lapse : " + ((end - start) / 1000000) + " ms");
howie
  • 2,587
  • 3
  • 27
  • 43
catalino
  • 29
  • 1
  • 4

2 Answers2

2

There is no good reason for this code to ever run faster on Spark, than database alone. First of all it is not even distributed, as you made the same mistake as many before you and don't partition the data.

But it more important is that you actually load data from the database - as a result it has to do at least as much work (and in practice more), then send data over the network, then data has to parsed by Spark, and processed. You basically do way more work and expect things to be faster - that's not going to happen.

If you want to reliably improve performance on Spark you should at least:

  • Extract data from the database.
  • Write to efficient (like not S3) distributed storage.
  • Use proper bucketing and partitioning to enable partition pruning and predicate pushdown.

Then you might have a better lack. But again, proper indexing of your data on the cluster, should improve performance as well, likely at a lower overall cost.

2

It is very important to set partitionColumn when your read from SQL. It use for parallel query. So you should decide which column is your partitionColumn.

In your case for example:

val jdbcDF = spark.read.format("jdbc")
                  .option("url", DBI_URL)
                  .option("driver", "org.postgresql.Driver")
                  .option("dbtable", "ghcn_all")
                  .option("fetchsize", 10000)
                  .option("partitionColumn", "ghcn_date")
                  .option("lowerBound", "2015-01-01")
                  .option("upperBound", "2015-12-31")
                  .option("numPartitions",16 )
                  .load()
                  .createOrReplaceTempView("ghcn_all");

More Reference:

howie
  • 2,587
  • 3
  • 27
  • 43
  • -@howie According to Spark 2.1.1 documentation "partitionColumn must be a numeric column from the table in question." Spark 2.4.0 does support date type in partitionColumn, but I am unable to find any documentation on how to create Spark 2.4.0 cluster in EC2. Without a numeric column I don't see how I can partition and parallel read my database. – catalino Mar 30 '19 at 02:48
  • Check your schema try to find any numeric column~:) Do you use EMR at AWS ? or Install spark yourself on EC2? – howie Mar 30 '19 at 06:19