11

I am trying to access a mid-size Teradata table (~100 million rows) via JDBC in standalone mode on a single node (local[*]).

I am using Spark 1.4.1. and is setup on a very powerful machine(2 cpu, 24 cores, 126G RAM).

I have tried several memory setup and tuning options to make it work faster, but neither of them made a huge impact.

I am sure there is something I am missing and below is my final try that took about 11 minutes to get this simple counts vs it only took 40 seconds using a JDBC connection through R to get the counts.

bin/pyspark --driver-memory 40g --executor-memory 40g

df = sqlContext.read.jdbc("jdbc:teradata://......)
df.count()

When I tried with BIG table (5B records) then no results returned upon completion of query.

zero323
  • 322,348
  • 103
  • 959
  • 935
Dev Patel
  • 292
  • 1
  • 5
  • 12
  • How do you count using R? – zero323 Aug 24 '15 at 18:28
  • @zero323 - simply using **RJDBC** and **teradataR** packages after setting up connection using Teradata JARS..and then `tdQuery("SELECT COUNT(*) FROM your_table)` – Dev Patel Aug 24 '15 at 18:37
  • 1
    As far as I know Spark JDBC Data Source can push down predicates but actual executing is done in Spark. It means you have to transfer your data to the Spark cluster. So it is not the same as executing SQL query over JDBC (R case). First you should do is to cache your data after loading. It won't improve performance for the first query though. – zero323 Aug 24 '15 at 19:03
  • @zero323 - thanks, I realized that after doing some more research on this. I do have a quick question thought - **what would be the fastest way to read data in apache spark? is it through Parquet file structure?** – Dev Patel Aug 24 '15 at 19:49
  • 1
    It is probably a good choice but the first thing you can try before you go this way is to use [Teradata Hadoop conector](http://downloads.teradata.com/download/connectivity/teradata-connector-for-hadoop-command-line-edition). It looks like it supports multiple export options including Hive tables. With a single machine network and disk IO can be still a limiting factor though. – zero323 Aug 24 '15 at 20:10
  • Suggest accepting Gianmarios answer. – samthebest Dec 11 '15 at 15:49

3 Answers3

18

All of the aggregation operations are performed after the whole dataset is retrieved into memory into a DataFrame collection. So doing the count in Spark will never be as efficient as it would be directly in TeraData. Sometimes it's worth it to push some computation into the database by creating views and then mapping those views using the JDBC API.

Every time you use the JDBC driver to access a large table you should specify the partitioning strategy otherwise you will create a DataFrame/RDD with a single partition and you will overload the single JDBC connection.

Instead you want to try the following AI (since Spark 1.4.0+):

sqlctx.read.jdbc(
  url = "<URL>",
  table = "<TABLE>",
  columnName = "<INTEGRAL_COLUMN_TO_PARTITION>", 
  lowerBound = minValue,
  upperBound = maxValue,
  numPartitions = 20,
  connectionProperties = new java.util.Properties()
)

There is also an option to push down some filtering.

If you don't have an uniformly distributed integral column you want to create some custom partitions by specifying custom predicates (where statements). For example let's suppose you have a timestamp column and want to partition by date ranges:

    val predicates = 
  Array(
    "2015-06-20" -> "2015-06-30",
    "2015-07-01" -> "2015-07-10",
    "2015-07-11" -> "2015-07-20",
    "2015-07-21" -> "2015-07-31"
  )
  .map {
    case (start, end) => 
      s"cast(DAT_TME as date) >= date '$start'  AND cast(DAT_TME as date) <= date '$end'"
  }

 predicates.foreach(println) 

// Below is the result of how predicates were formed 
//cast(DAT_TME as date) >= date '2015-06-20'  AND cast(DAT_TME as date) <= date '2015-06-30'
//cast(DAT_TME as date) >= date '2015-07-01'  AND cast(DAT_TME as date) <= date '2015-07-10'
//cast(DAT_TME as date) >= date '2015-07-11'  AND cast(DAT_TME as date) <= date //'2015-07-20'
//cast(DAT_TME as date) >= date '2015-07-21'  AND cast(DAT_TME as date) <= date '2015-07-31'


sqlctx.read.jdbc(
  url = "<URL>",
  table = "<TABLE>",
  predicates = predicates,
  connectionProperties = new java.util.Properties()
)

It will generate a DataFrame where each partition will contain the records of each subquery associated to the different predicates.

Check the source code at DataFrameReader.scala

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
Gianmario Spacagna
  • 1,270
  • 14
  • 12
  • **@zero323**, **@Gianmario Spacagna** if I actually need to read the entire `MySQL` table (and not just get `count`), then how can I improve the *sluggish* performance of `Spark-SQL`? I'm already *parallelizing* the read operation using [`spark.read.jdbc(..numPartitions..)`](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader@jdbc(url:String,table:String,columnName:String,lowerBound:Long,upperBound:Long,numPartitions:Int,connectionProperties:java.util.Properties):org.apache.spark.sql.DataFrame) method. – y2k-shubham Mar 06 '18 at 12:26
  • My `MySQL` (`InnoDB`) table has ~ **186M records** weighing around **149 GB** (as per stats shown by `phpMyAdmin`) and I'm using `numPartitions = 32`. [**`Spark 2.2.0`**] I'm on `EMR 5.12.0` with 1 `master`, 1 `task` and 1 `core` (all **`r3.xlarge`**, 8 `vCore`, 30.5 `GiB` memory, 80 `SSD` `GB` storage). I've found that reading `MySQL` table into `DataFrame` fails if I DON'T `limit` the records to ~ **1.5-2M**. It gives a long *stack-trace* that has `javax.servlet.ServletException: java.util.NoSuchElementException: None.get` & `java.sql.SQLException: Incorrect key file for table..` – y2k-shubham Mar 06 '18 at 12:48
5

Does the unserialized table fit into 40 GB? If it starts swapping on disk performance will decrease drammatically.

Anyway when you use a standard JDBC with ansi SQL syntax you leverage the DB engine, so if teradata ( I don't know teradata ) holds statistics about your table, a classic "select count(*) from table" will be very fast. Instead spark, is loading your 100 million rows in memory with something like "select * from table" and then will perform a count on RDD rows. It's a pretty different workload.

  • I think it would and I also tried increasing memory to 100 GB, but didn't see any improvement. I am not trying to load 100 million rows in memory, but running some aggregated operation such as `count()` on dataframe or `count(*)` on temp table, but **Spark** take too long. I also tried registering a DF as temp table and did a simple count, but takes about the same time. `ra1.registerTempTable("ra_dt"); total = sqlContext.sql("select count(*) from ra_dt")` – Dev Patel Aug 24 '15 at 18:41
  • 1
    Yes, but I think that spark is not pruning the count operation on DB engine, so it will load all rows in memory and then will perform a count on DF. – axlpado - Agile Lab Aug 24 '15 at 18:48
  • How many columns do you have into that table, with 100 million rows is pretty easy to reach 100 GB of unserialized objects. Could you post your table schema ? – axlpado - Agile Lab Aug 24 '15 at 18:52
  • I think you're right, I was reading few other posts online and found that Spark is trying to load the data before applying count operation. In that case, what would be ideal way to read this type of data faster in Spark? In other words **what would be the fastest way to read data in apache spark?** Here is my table schema: root |-- field1: decimal(18,0) (nullable = true) |-- field2: string (nullable = true) |-- field3: date (nullable = true) |-- field4: date (nullable = true) |-- field5: integer (nullable = true) |-- field6: string (nullable = true) – Dev Patel Aug 24 '15 at 19:46
  • 3
    Spark is a distributed processing engine, so the best way to load data in spark is from a distributed file system or dbms. In your case, working on a signle instance, I think you can only improve performance specifying partitionColumn, lowerBound, upperBound, numPartition to improve reading parallelism. If you need to perform other queries after the count you can cache the DF before count it, so the first count will take its time but then next queries will be in memory and will be faster. – axlpado - Agile Lab Aug 24 '15 at 20:22
  • Makes sense! Thanks for the answer! – Dev Patel Aug 26 '15 at 15:17
  • How many executors are you running, and how many `--executor-cores`? – Boggio Aug 28 '15 at 14:24
0

One solution that differs from others is to save the data from the oracle table in an avro file (partitioned in many files) saved on hadoop. This way reading those avro files with spark would be a peace of cake since you won't call the db anymore.

Vasile Surdu
  • 1,203
  • 1
  • 10
  • 11