0

I am quite new to PySpark (or Spark in general). I am trying to connect Spark with a MySQL instance I have running on RDS. When I load the table like so, does Spark load the entire table in memory?

from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.jars", "/usr/share/java/mysql-connector-java-8.0.33.jar") \
              .master("spark://spark-master:7077") \
              .appName("app_name") \
              .getOrCreate()

table_1_df = spark.read.format("jdbc").option("url", "jdbc:mysql://mysql:3306/some_db") \
    .option("driver", "com.mysql.jdbc.Driver") \
    .option("dbtable", "table1") \
    .option("user", "user") \
    .option("password", "pass") \
    .load()

print(table_1_df.head())

If yes, is there a way to limit it, say by asking Spark to load contents based on a condition? I would like to see if its possible to limit the fetch by (say) a primary key. Any input would be helpful. Thank you

Bhargav Panth
  • 301
  • 1
  • 3
  • 10
  • Does it not support the MySQL command "WHERE" to specify criteria? Such as WHERE pkey = 2 ? I'm not seeing anything like a statement so It seems you are just loading the whole table. – easleyfixed Jul 18 '23 at 16:31
  • Minor research found information pointing to this -- > filtered_df = df.where ("id > 1") so it is possible .. look here https://learn.microsoft.com/en-us/azure/databricks/getting-started/dataframes-python – easleyfixed Jul 18 '23 at 16:34
  • I know the example is for Azure and Databricks but I'm hoping you can gleam the concept from that link. – easleyfixed Jul 18 '23 at 16:37
  • Hi, thanks for chiming in easleyfixed. I can see that the WHERE is applied on the dataframe (not on the query itself). At this point, would it not have loaded the data from the table and converted it into a Pandas frame? All that seems to do is overfetch and filter after – Bhargav Panth Jul 18 '23 at 16:46
  • It seems like that might be the case, I was skimming so I apologize. It looked like it would over pull and then just filter down to what you needed, so obviously efficiency wise its best to only pull the data you need, but ultimately you are going to need to find a way to feed a where statement to define criteria, or a LIMIT 1000 to only pull 1000 records sort of thing. – easleyfixed Jul 18 '23 at 17:06

1 Answers1

2

Similar / related SO answer: Does spark predicate pushdown work with JDBC?

The built-in JDBC source for Spark sometimes performs what is called "Predicate Pushdown" which basically means it tries its best to translate general SQL transformations such as .where(...) down the low-level source's implementation -- which is completely different from source to source (e.g. in MySQL you use WHERE ... but for Parquet files on S3 there is no such thing as WHERE ...).

If you want to have more explicit control over the SQL query that gets executed when using the JDBC source, you can change the dbtable option to use a subquery instead of just the name of the table. This way, Spark has to execute the query as-is; it is a tradeoff of readability and stickiness to the data source vs. higher level APIs of Spark.

# Note that to make it a subquery it must be encapsulated in parenthesis and aliased!
table_1_query = "(SELECT * FROM table1 WHERE inserted_at >= NOW() - INTERVAL 7 DAY) a"

table_1_df = spark.read.format("jdbc").option("url", "jdbc:mysql://mysql:3306/some_db") \
    .option("driver", "com.mysql.jdbc.Driver") \
    .option("dbtable", table_1_qury) \
    .option("user", "user") \
    .option("password", "pass") \
    .load()

This is also described on Databricks' documentation: https://docs.databricks.com/external-data/jdbc.html#push-down-a-query-to-the-database-engine


That said, the built-in JDBC source in Spark 3.x does a good job translating most common operations with the predicate pushdown, meaning you can usually do things like .where(...), .select(...), .limit(...) without having to trick the framework with the subquery approach.

When in doubt, just test it, view the Spark execution plan with .explain() or in the Spark UI, and if you can catch it in time you can also see the running query on your MySQL server too -- that will reveal what the true SQL query Spark generates is.

Zach King
  • 798
  • 1
  • 8
  • 21
  • 1
    That makes sense. Also, FWIW it looks like we need to alias the query before passing it in. Something like `(SELECT * FROM table1 WHERE inserted_at >= NOW() - INTERVAL 7 DAY) AS something`. Otherwise, I seem to be getting this error `Every derived table must have its own alias` – Bhargav Panth Jul 19 '23 at 09:50