4

I have a PySpark (2.3.0) dataframe with a timestamp type column:

>> df.show()
+-------------------+
|            column |
+-------------------+
|2004-02-16 12:01:37|
|2004-02-23 10:28:49|
|2004-02-23 12:49:14|
|2004-02-26 12:29:58|
|2004-03-02 10:10:28|
|2004-03-03 03:40:13|
|2004-03-16 05:00:10|
|2004-03-16 03:28:21|
|2004-03-17 02:45:22|
|2004-03-23 08:14:47|
+-------------------+
>> df.printSchema()
root
|-- column: timestamp (nullable = true)

I want to filter that dataframe to find records on a specific date:

import datetime
date = datetime.datetime.strptime('2018-06-07', '%Y-%m-%d').date()

What is the most efficient method of doing this filtering? Note: the data is read in via JDBC so it may not be distributed.

Here is what I have tried (have not noticed major differences), which is preferable? Have I missed anything?

Method 1: Cast as date

df.filter(psf.col('column').cast('date') == date)

Method 2: match on year, month, dayofmonth

import pyspark.sql.functions as psf
(
  df
  .filter(psf.dayofmonth('column') == date.day)
  .filter(psf.month('column') == date.month)
  .filter(psf.year('column') == date.year)
)
zero323
  • 322,348
  • 103
  • 959
  • 935
Nolan Conaway
  • 2,639
  • 1
  • 26
  • 42
  • 2
    Can you [edit] your question to add the output of `df.printSchema()`. I could imagine a way where the second method could be faster depending on the skew of day, month, and year. For example, the way you are checking is optimal if the first filter on days fails more often than the last check on year. But it would be slower if, for example, many of your dates were on the first of the month. – pault Jun 08 '18 at 15:23
  • 1
    I added `printSchema()`! for this example lets imagine we do not know the distribution of the dates ahead of time. is the first method then safer? – Nolan Conaway Jun 08 '18 at 15:31
  • it is read from SQL server over JDBC – Nolan Conaway Jun 08 '18 at 15:40

1 Answers1

2

Here is what I have tried (have not noticed major differences), which is preferable?

Neither. Both methods are inefficient and cannot fully leverage database and Spark capabilities. Because column seems to be datetime or equivalent, and query requires casting, Spark cannot pushdown predicate, and filtering is applied on the cluster side, therefore performance will be similar (give or take overhead of few function calls).

To improve performance you can redefine query as follows (plus other parameters you normally use):

df = spark.read.jdbc(
    url,
    "(SELECT CAST(column AS date) date, * FROM table) AS tmp",
    ...
)

and then:

df.filter(psf.col('date') == date)

If you don't plan to distribute reading process or use dynamic queries you can also use predicates:

spark.read.jdbc(
    ...,
    predicates=["CAST(column AS date) = '{}'".format(date)])
)

or embed selection in the table definition.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Gnarly! this speeds things up a lot. Even better is to do the filtering within the SQL query (which is a possibility for my particular case). – Nolan Conaway Jun 08 '18 at 18:31