10

I am running spark 1.6 on 3 VMs (i.e. 1x master; 2x slaves) all with 4 cores and 16GB RAM.

I can see the workers registered on spark-master webUI.

I want to retrieve data from my Vertica database to work on it. As I didn't manage to run complex queries I tried dummy queries to understand. We consider here an easy task.

My code is:

df = sqlContext.read.format('jdbc').options(url='xxxx', dbtable='xxx', user='xxxx', password='xxxx').load()
four = df.take(4)

And the output is (note: I replace with @IPSLAVE the slave VM IP:Port):

16/03/08 13:50:41 INFO SparkContext: Starting job: take at <stdin>:1
16/03/08 13:50:41 INFO DAGScheduler: Got job 0 (take at <stdin>:1) with 1 output partitions
16/03/08 13:50:41 INFO DAGScheduler: Final stage: ResultStage 0 (take at <stdin>:1)
16/03/08 13:50:41 INFO DAGScheduler: Parents of final stage: List()
16/03/08 13:50:41 INFO DAGScheduler: Missing parents: List()
16/03/08 13:50:41 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at take at <stdin>:1), which has no missing parents
16/03/08 13:50:41 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 5.4 KB, free 5.4 KB)
16/03/08 13:50:41 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.6 KB, free 7.9 KB)
16/03/08 13:50:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on @IPSLAVE (size: 2.6 KB, free: 511.5 MB)
16/03/08 13:50:41 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/03/08 13:50:41 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at take at <stdin>:1)
16/03/08 13:50:41 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/03/08 13:50:41 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, @IPSLAVE, partition 0,PROCESS_LOCAL, 1922 bytes)
16/03/08 13:50:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on @IPSLAVE (size: 2.6 KB, free: 511.5 MB)
16/03/08 15:02:20 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 4299240 ms on @IPSLAVE (1/1)
16/03/08 15:02:20 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/03/08 15:02:20 INFO DAGScheduler: ResultStage 0 (take at <stdin>:1) finished in 4299.248 s
16/03/08 15:02:20 INFO DAGScheduler: Job 0 finished: take at <stdin>:1, took 4299.460581 s

As you can see it take a reaaaaaally long time. My table is actually quite big (stores around 220 millions lines, 11 fields each) but such a query would be quite instantly executed using "normal" sql (e.g. pyodbc).

I guess I am missunderstanding/missusing Spark, would you have so ideas or advice to make it work better?

ZygD
  • 22,092
  • 39
  • 79
  • 102
pltrdy
  • 2,069
  • 1
  • 11
  • 29

1 Answers1

17

While Spark supports a limited predicate pushdown over JDBC all other operations, like limit, group, aggregations are performed internally. Unfortunately it means that take(4) will fetch data first and then apply the limit. In other words your database will execute (assuming no projections an filters) something equivalent to:

SELECT * FROM table 

and the rest will handled by Spark. There are some optimizations involved (in particular Spark evaluates partitions iteratively to obtain number of records requested by LIMIT) but it still quite inefficient process compared to database-side optimizations.

If you want to push limit to the database you'll have to do it statically using subquery as a dbtable parameter:

(sqlContext.read.format('jdbc')
    .options(url='xxxx', dbtable='(SELECT * FROM xxx LIMIT 4) tmp', ....))
sqlContext.read.format("jdbc").options(Map(
  "url"     -> "xxxx",
  "dbtable" -> "(SELECT * FROM xxx LIMIT 4) tmp",
))

Please note that an alias in subquery is mandatory.

Note:

This behavior may be improved in the future, once Data Source API v2 is ready:

user10938362
  • 3,991
  • 2
  • 12
  • 29
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Interesting. I'll work on it and come back to you to validate the anwser if all is ok :) Thx! – pltrdy Mar 08 '16 at 14:44
  • @zero323 Should the character after `tmp` be `'` instead? The Scala example works with `sqlContext.read.jdbc("jdbc:sqlserver://example.com;databaseName=local;user=debug;password=debug", "(SELECT TOP 5 * FROM ExampleTable) tmp", new java.util.Properties)` – Răzvan Flavius Panda Apr 14 '16 at 09:30
  • @RăzvanPanda Yes, it should. Fixed, thanks Options work in Scala as well. – zero323 Apr 14 '16 at 09:47
  • @zero323 It seems that using a filter is not translated to the database specific SQL either :( – Răzvan Flavius Panda Apr 14 '16 at 09:55
  • 1
    @RăzvanPanda All depends on a type of a filter. See for example http://stackoverflow.com/a/32585936/1560062 Is it what you mean? _is limited to the logical conjunction_ - in particular. – zero323 Apr 14 '16 at 10:09
  • It seems that this is not solved yet https://issues.apache.org/jira/browse/SPARK-22388 – abiratsis Oct 15 '20 at 10:45