2

When I use JDBC connection to feed spark, even if I use filtering on dataframe; when I inspect query log on my oracle datasource, I am seeing spark executing:

SELECT [column_names] FROM MY_TABLE

Referring to https://stackoverflow.com/a/40870714/1941560,

I was expecting spark lazily plan query and execute like;

SELECT [column_names] FROM MY_TABLE WHERE [filter_predicate]

But spark is not doing that. It takes all the data and filters afterwards. I need this behaviour because I don't want to retrieve all the table every x minutes but only changed rows (incremental filterin by UPDATE_DATE).

Is there a way to achieve this?

Here is my python code:

df = ...
lookup_seconds = 5 * 60;
now = datetime.datetime.now(pytz.timezone("some timezone"))
max_lookup_datetime = now - datetime.timedelta(seconds=lookup_seconds)
df.where(df.UPDATE_DATE > max_lookup_datetime).explain()

Explain result:

Physical Plan == *Filter (isnotnull(UPDATE_DATE#21) && (UPDATE_DATE#21 > 1516283483208806)) +- Scan ExistingRDD[NO#19,AMOUNT#20,UPDATE_DATE#21,CODE#22,AMOUNT_OLD#23] 

EDIT: Complete answer is here

px5x2
  • 1,495
  • 2
  • 14
  • 29
  • 1
    Could you show, how you create `df` and extended execution plan (`explain(True)`). – zero323 Jan 18 '18 at 15:21
  • It would help to see how exactly you filter the data. Do you create the data frame using JDBC connector, and then filter? If so, there's no predicate pushdown in JDBC connector, and @Guitao's answer is the correct one. However, if you are filtering with the dbtable property, then please add a code snippet so we could assist. – Lior Chaga Jan 18 '18 at 19:13
  • @LiorChaga Spark supports predicate pushdown with JDBC source (I already posted plan with exact the same logic as OP, where filter is pushed down). There is no need for manual filters for trivial predicates. Still - without [mcve] we can only guess what is going on. – zero323 Jan 18 '18 at 23:23

2 Answers2

2

Most likely scenario here is that you cache input DataFrame. In that case, Spark won't attempt selection or projection pushdown, and instead will fetch data to the cluster, and process locally.

It easy to illustrate this behavior:

df = spark.read.jdbc(url, table, properties={})
df
DataFrame[id: int, UPDATE_DATE: timestamp]

If data is not cached:

df.select("UPDATE_DATE").where(df.UPDATE_DATE > max_lookup_datetime).explain(True)
== Parsed Logical Plan ==
Filter (UPDATE_DATE#1 > 1516289713075960)
+- Project [UPDATE_DATE#1]
   +- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]

== Analyzed Logical Plan ==
UPDATE_DATE: timestamp
Filter (UPDATE_DATE#1 > 1516289713075960)
+- Project [UPDATE_DATE#1]
   +- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]

== Optimized Logical Plan ==
Project [UPDATE_DATE#1]
+- Filter (isnotnull(UPDATE_DATE#1) && (UPDATE_DATE#1 > 1516289713075960))
   +- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]

== Physical Plan ==
*Scan JDBCRelation(df) [numPartitions=1] [UPDATE_DATE#1] PushedFilters: [*IsNotNull(UPDATE_DATE), *GreaterThan(UPDATE_DATE,2018-01-18 15:35:13.07596)], ReadSchema: struct<UPDATE_DATE:timestamp>

both selection and projection are pushed down. However if you cache df, and check execution plan once again:

df.cache()
DataFrame[id: int, UPDATE_DATE: timestamp]
df.select("UPDATE_DATE").where(df.UPDATE_DATE > max_lookup_datetime).explain(True)max_lookup_datetime).explain(True)
== Parsed Logical Plan ==
Filter (UPDATE_DATE#1 > 1516289713075960)
+- Project [UPDATE_DATE#1]
   +- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]

== Analyzed Logical Plan ==
UPDATE_DATE: timestamp
Filter (UPDATE_DATE#1 > 1516289713075960)
+- Project [UPDATE_DATE#1]
   +- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]

== Optimized Logical Plan ==
Project [UPDATE_DATE#1]
+- Filter (isnotnull(UPDATE_DATE#1) && (UPDATE_DATE#1 > 1516289713075960))
   +- InMemoryRelation [id#0, UPDATE_DATE#1], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *Scan JDBCRelation(df) [numPartitions=1] [id#0,UPDATE_DATE#1] ReadSchema: struct<id:int,UPDATE_DATE:timestamp>

== Physical Plan ==
*Filter (isnotnull(UPDATE_DATE#1) && (UPDATE_DATE#1 > 1516289713075960))
+- InMemoryTableScan [UPDATE_DATE#1], [isnotnull(UPDATE_DATE#1), (UPDATE_DATE#1 > 1516289713075960)]
      +- InMemoryRelation [id#0, UPDATE_DATE#1], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *Scan JDBCRelation(df) [numPartitions=1] [id#0,UPDATE_DATE#1] ReadSchema: struct<id:int,UPDATE_DATE:timestamp>

both projection and selection are delayed.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • I upvoted this because, api v2 must address this issue as mentioned in https://stackoverflow.com/questions/32573991/does-spark-predicate-pushdown-work-with-jdbc – px5x2 Jan 19 '18 at 05:48
  • Btw, I didn't use caching. – px5x2 Jan 19 '18 at 05:49
  • API V2 doesn't add anything new, that would cover this scenario. To reiterate - this should work in the current API, unless you do something, that directly prevents it. Caching is only one possible option, but once again - not possible to answer for sure without seeing full code and execution plan. – zero323 Jan 19 '18 at 17:40
1

From the official doc1:

dbtable The JDBC table that should be read. Note that anything that is valid in a FROM clause of a SQL query can be used. For example, instead of a full table you could also use a subquery in parentheses.

You could set the JDBC option dbtable to a subquery SQL. For example:

jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "(select * from tbl where UPDATE_DATE > max_lookup_datetime) t") \
    .option("user", "username") \
    .option("password", "password") \
    .load()
zero323
  • 322,348
  • 103
  • 959
  • 935
Guitao
  • 107
  • 4