I am not an expert of Spark SQL API, nor of the underlying RDD one.
But, knowing of the Catalyst optimization engine, I would expect Spark to try and minimize in-memory effort.
This is my situation: I have, let's say, two table
TABLE GenericOperation (ID, CommonFields...)
TABLE SpecificOperation (OperationID, SpecificFields...)
They are both quite huge (~500M, not big data, but unfeasible to have as a whole in memory in a standard application server)
That said, suppose I have to retrieve using Spark (part of a larger use case) all the SpecificOperation
instances that match some particular condition on fields that belong to GenericOperation
.
This is the code that I am using:
val gOps = spark.read.jdbc(db.connection, "GenericOperation", db.properties)
val sOps = spark.read.jdbc(db.connection, "SpecificOperation", db.properties)
val joined = sOps.join(gOps).where("ID = OperationID")
joined.where("CommonField= 'SomeValue'").select("SpecificField").show()
Problem is, when it comes to run the above, I can see from SQL Profiler that Spark does not execute the join on the database, but rather retrieves all the OperationID
from SpecificOperation
, and then I assume it will be running all the merge in memory. Since no filter is applicable on SpecificOperation
, such retrieve would bring a lot, too much, data to the end system.
Is it possible to write the above so that the join is demanded directly to dbms? Or it depends on some magic configuration of Spark I am not aware of?
Of course, I could simply hardcode the join as a subquery when retrieving, but that's not feasible in my case: statements hve to be created at runtime starting from simple building blocks. Hence, I need to implement this starting from two spark.sql.DataFrame
already built up
As a side note, I am running this with Spark 2.3.0 for Scala 2.11, against a SQL Server 2016 database instance.