0

I am connected via jdbc to a DB having 500'000'000 of rows and 14 columns.

Here is the code used:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

properties = {'jdbcurl': 'jdbc:db:XXXXXXXXX','user': 'XXXXXXXXX', 'password': 'XXXXXXXXX'}

data = spark.read.jdbc(properties['jdbcurl'], table='XXXXXXXXX', properties=properties)

data.show()

The code above took 9 seconds to display the first 20 rows of the DB.

Later I created a SQL temporary view via

data[['XXX','YYY']].createOrReplaceTempView("ZZZ")

and I ran the following query:

sqlContext.sql('SELECT AVG(XXX) FROM ZZZ').show()

The code above took 1355.79 seconds (circa 23 minutes). Is this ok? It seems to be a large amount of time.

In the end I tried to count the number of rows of the DB

sqlContext.sql('SELECT COUNT(*) FROM ZZZ').show()

It took 2848.95 seconds (circa 48 minutes).

Am I doing something wrong or are these amounts standard?

FrancoFranchi
  • 131
  • 1
  • 10
  • what happens if you add a " LIMIT 20" at the end of your first sql query, does it speed up the result? I can't imagine that these amounts of proessing time are standard. Do you utizile all of you cores? How many nodes are used (only local or in cluster) and how many RAM is assigned to your workers? – Aydin K. Mar 05 '18 at 12:02
  • Hello Aydin, I have launched the query with "LIMIT 20". I'll let you know as soon as possible. How can I check for the number of cores and other information you are asking? I can say that the following snippet "spark.sparkContext._jsc.sc().getExecutorMemoryStatus().keySet().size()" gives back 4. – FrancoFranchi Mar 05 '18 at 12:09
  • you can setup the cores and assign memory via different ways (spark-env or programmatically), have a look on this answer how it's done via python programmatically: https://stackoverflow.com/questions/41886346/spark-2-1-0-session-config-settings-pyspark – Aydin K. Mar 05 '18 at 12:21
  • you can retrieve all parameters of your job in the spark-gui on http://localhost:4040 usually – Aydin K. Mar 05 '18 at 12:22
  • and also, if you run both queries one after another on the same dataset, you should cache() or persist () them (preferably into memory, if it fits otherwise mem_and_disk). this should speed up the 2nd and subsequent queries. – Aydin K. Mar 05 '18 at 12:23
  • Hello Aydin, with LIMIT 20 it took 2756 seconds (circa 49 minutes). I found the config and it seems that I have: - 'spark.executor.memory', '6G' - 'spark.driver.memory', '16384M' I do not have any information regarding CORES and I cannot connect to localhost:4040 because I am working on a cloud environment. – FrancoFranchi Mar 05 '18 at 13:18
  • ok strange. Try to launch the application again and connect to http://server-where-driver-runs:4040 to see where the most time is consumed, because the spark UI is meant for debugging, Maybe you can find the bottlneneck with the help of it (maybe network latency between the workers?). The UI lives as the application runs and quits afterwards. If you want to preserve the UI, you need to start the spark-history server (in spark/sbin folder) – Aydin K. Mar 05 '18 at 13:31

1 Answers1

0

When you read jdbc source with this method you loose parallelism, main advantage of spark. Please read the official spark jdbc guidelines, especially regarding partitionColumn, lowerBound, upperBound and numPartitions. This will allow spark to run multiple JDBC queries in parallel, resulting with partitioned dataframe.

Also tuning fetchsize parameter may help for some databases.

Mariusz
  • 13,481
  • 3
  • 60
  • 64
  • Hi Mariusz, actually I do not have the parameter 'partitionColumn' in my spark.read.jdbc(...) where 'spark = sparkSession.builder.getOrCreate()'. I have just the option 'column'. Is there any other method to read from a source without loosing parallelism? – FrancoFranchi Mar 06 '18 at 08:16
  • I am using Spark 2.1.0 – FrancoFranchi Mar 06 '18 at 08:23
  • `column` is OK, this was renamed in next versions. – Mariusz Mar 06 '18 at 14:17
  • Hi Mariusz, I set the _column_ parameter and the other you suggest (_lowerBound_ = 1 and _upperBound_ = 10000) but it seems that in this way it doesn't read the DB at all. – FrancoFranchi Mar 07 '18 at 08:13
  • It will read rows with values 1 to 10000 in this column. You need to adapt these range to real data. – Mariusz Mar 07 '18 at 11:06
  • Hi Mariusz, I set the correct range for the specified column but it continues to be very slow. Is it possible that I have fewer nodes/executors than I expected? – FrancoFranchi Mar 08 '18 at 15:12
  • Well, did you try increasing level of parallelism in spark by adding more executors or more cores per executor? – Mariusz Mar 09 '18 at 20:41
  • Can you explain to me how can I apply your suggestions (increasing level of parallelism in spark by adding more executors or more cores per executor)? Many thanks! – FrancoFranchi Mar 12 '18 at 09:32
  • In Spark environment parallelism level is controlled by the number of executors (`--num-executors`) and number of threads per executor (`--executor-cores`). I suggest you to read more about spark basics, for example from the [offical documentation](https://spark.apache.org/docs/latest/). – Mariusz Mar 12 '18 at 19:30
  • I actually read it but it is not enough clear to me. I would like to know how the partition is controlled in order to set the parameters properly. – FrancoFranchi Mar 13 '18 at 07:58
  • Please take a look on this answers: https://stackoverflow.com/a/36855369, https://stackoverflow.com/a/49259008 – Mariusz Mar 13 '18 at 21:44
  • Thank you Mariusz! – FrancoFranchi Mar 14 '18 at 13:55