1

This is my first time working with either Python or Spark, I'm a Java developer. So I don't know how is the best way to solve it here.

I'm working with:

  • Spark 2.2.0 built for Hadoop 2.7.3
  • Python 2.7.12

I have a PySpark script, this script executes different queries and creates temporary views, until it finally executes a final queries using/joining the different temporary views. It will write files with the result of the final executed query.

The script works fine, but we found out, that when there is no data, it still creates the 200 files (all empty). We wanted to validate that it actually has data before calling the write method or even create the temporary view, so we tried with if df.count() == 0:, if so raising an error, otherwise, just proceed.

I just added that validation to the final two dataframes, before doing the temporary view, so it interrupts the process as soon as possible, and before executing the next queries.

Then we read somewhere, count is a very expensive method to validate that there is data because it goes through all the executioners , so before even trying it, we changed to something recommended in several places: to use df.take(1), df.head(1),or df.first(1). We went finally with head(1).

However, this changed the execution elapsed time from 30 mins to actually more than 1h 40m.

I'd like to know which other way I can avoid spark to write empty files, without increasing that much the computation time.

Since I'm new with all this, I'm opened to suggestion.

Edit

I have already read this thread: How to check if spark dataframe is empty. From this very thread, I took that I should use len(df.head(1)) == 0, and that increased the computing time from 30 minutes to 1h 40m+.

desertnaut
  • 57,590
  • 26
  • 140
  • 166
user3049941
  • 87
  • 1
  • 2
  • 9
  • Possible duplicate of [How to check if spark dataframe is empty](https://stackoverflow.com/questions/32707620/how-to-check-if-spark-dataframe-is-empty) – mtoto Jan 23 '18 at 12:56
  • Well, that is exactly one of the threads I read last week, and from that precise one is that I took `df.head(1)`, and **it slowed down the process for a lot more than an hour**. – user3049941 Jan 23 '18 at 13:09
  • We need an [MVCE](https://stackoverflow.com/help/mcve) to reproduce it otherwise it is not salvageable and I'm voting to close the question. `count` is more expensive than head because it will scan all the data but you might be doing something else before that need actually computing and this might not be your bottleneck... – eliasah Jan 23 '18 at 15:39
  • 3
    One of your columns must be expensive to calculate . Is there a non-computed column you can select: `len(df.select('non-computed column').head(1)) == 0` – Alex Jan 23 '18 at 15:44
  • 1
    unless he's pull data from jdbc or something like that @Jaco that's why we need the MVCE... – eliasah Jan 23 '18 at 15:47
  • 1) I run the script with `row = df.head(1)` and run it, it took 1h 51m. 2) I commented that line and the following condition and its block: `if not head or len(head) == 0:` and the process run in 31m, but just generating the empty files. – user3049941 Jan 23 '18 at 15:47
  • @jaco I'm gonna try that – user3049941 Jan 23 '18 at 15:49
  • @Jaco I did what you told me, apparently I have several columns that takes long to compute, so I picked one non-computed. It did reduced the over time from 1h20 to 30m (so the process instead of running: - 1h 40m (with the old validation `len(df.head(1))==0`) - 30m (without validation) Took 1h. But I had three validations, over the two main dataframes that creates the final two views that are used in the final query, and one right before writing the files, with the last dataframe that contains the actual data for the files. Now I have only in the last one, and I'm gonna try that out. – user3049941 Jan 23 '18 at 20:42

2 Answers2

3

Just get your dataframe's rdd and check if it is empty:

df.rdd.isEmpty()

There are two types of operations in spark: actions and transformations. All transformations in Spark are lazy, they do not compute their results right away. The transformations are only computed when an action is executed. Actions are costly because spark needs to run all the transformations to that point to run the action.

Henrique Florencio
  • 3,440
  • 1
  • 18
  • 19
0

@Jaco I finally did something like if df.select('my_no_computed_column').head() is None:, because apparently, head() with no parameter will assume 1 and according to Spark's code:

    @ignore_unicode_prefix
    @since(1.3)
    def head(self, n=None):
        """Returns the first ``n`` rows.

        .. note:: This method should only be used if the resulting array is expected
            to be small, as all the data is loaded into the driver's memory.

        :param n: int, default 1. Number of rows to return.
        :return: If n is greater than 1, return a list of :class:`Row`.
            If n is 1, return a single Row.

        >>> df.head()
        Row(age=2, name=u'Alice')
        >>> df.head(1)
        [Row(age=2, name=u'Alice')]
        """
        if n is None:
            rs = self.head(1)
            return rs[0] if rs else None
        return self.take(n)

it will return a None if there is no rows (I might be reading it all wrong though, I've been programming with Java for over 10 years now, and Python as well as Spark are too new for me, and Python is too odd for my eyes).

It did reduce the running time considerably.

user3049941
  • 87
  • 1
  • 2
  • 9