5

I have a dataset with ~5M rows x 20 columns, containing a groupID and a rowID. My goal is to check whether (some) columns contain more than a fixed fraction (say, 50%) of missing (null) values within a group. If this is found, the entire column is set to missing (null), for that group.

df = spark.read.parquet('path/to/parquet/')
check_columns = {'col1': ..., 'col2': ..., ...}  # currently len(check_columns) = 8

for col, _ in check_columns.items():
    total = (df
             .groupBy('groupID').count()
             .toDF('groupID', 'n_total')
             )

    missing = (df
               .where(F.col(col).isNull())
               .groupBy('groupID').count()
               .toDF('groupID', 'n_missing')
               )
    # count_missing = count_missing.persist()  # PERSIST TRY 1
    # print('col {} found {} missing'.format(col, missing.count()))  # missing.count() is b/w 1k-5k

    poor_df = (total
               .join(missing, 'groupID')
               .withColumn('freq', F.col('n_missing') / F.col('n_total'))
               .where(F.col('freq') > 0.5)
               .select('groupID')
               .toDF('poor_groupID')
               )

    df = (df
          .join(poor_df, df['groupID'] == poor_df['poor_groupID'], 'left_outer')
          .withColumn(col, (F.when(F.col('poor_groupID').isNotNull(), None)
                            .otherwise(df[col])
                            )
                    )
        .select(df.columns)
        )

    stats = (missing
             .withColumnRenamed('n_missing', 'cnt')
             .collect()  # FAIL 1
             )

    # df = df.persist()  # PERSIST TRY 2

print(df.count())  # FAIL 2

I initially assigned 1G of spark.driver.memory and 4G of spark.executor.memory, eventually increasing the spark.driver.memory up to 10G.

Problem(s): The loop runs like a charm during the first iterations, but towards the end, around the 6th or 7th iteration I see my CPU utilization dropping (using 1 instead of 6 cores). Along with that, execution time for one iteration increases significantly. At some point, I get an OutOfMemory Error:

  • spark.driver.memory < 4G: at collect() (FAIL 1)
  • 4G <= spark.driver.memory < 10G: at the count() step (FAIL 2)

Stack Trace for FAIL 1 case (relevant part):

[...]
py4j.protocol.Py4JJavaError: An error occurred while calling o1061.collectToPython.
: java.lang.OutOfMemoryError: Java heap space
[...]

The executor UI does not reflect excessive memory usage (it shows a <50k used memory for the driver and <1G for the executor). The Spark metrics system (app-XXX.driver.BlockManager.memory.memUsed_MB) does not either: it shows 600M to 1200M of used memory, but always >300M remaining memory. (This would suggest that 2G driver memory should do it, but it doesn't.)

It also does not matter which column is processed first (as it is a loop over a dict(), it can be in arbitrary order).

My questions thus:

  • What causes the OutOfMemory Error and why are not all available CPU cores used towards the end?
  • And why do I need 10G spark.driver.memory when I am transferring only a few kB from the executors to the driver?

A few (general) questions to make sure I understand things properly:

  • If I get an OOM error, the right place to look at is almost always the driver (b/c the executor spills to disk)?
  • Why would count() cause an OOM error - I thought this action would only consume resources on the exector(s) (delivering a few bytes to the driver)?
  • Are the memory metrics (metrics system, UI) mentioned above the correct places to look at?

BTW: I run Spark 2.1.0 in standalone mode.

UPDATE 2017-04-28

To drill down further, I enabled a heap dump for the driver:

cfg = SparkConfig()
cfg.set('spark.driver.extraJavaOptions', '-XX:+HeapDumpOnOutOfMemoryError')

I ran it with 8G of spark.driver.memory and I analyzed the heap dump with Eclipse MAT. It turns out there are two classes of considerable size (~4G each):

java.lang.Thread
    - char (2G)
    - scala.collection.IndexedSeqLike
        - scala.collection.mutable.WrappedArray (1G)
    - java.lang.String (1G)

org.apache.spark.sql.execution.ui.SQLListener
    - org.apache.spark.sql.execution.ui.SQLExecutionUIData 
      (various of up to 1G in size)
        - java.lang.String
    - ...

I tried to turn off the UI, using

cfg.set('spark.ui.enabled', 'false')

which made the UI unavailable, but didn't help on the OOM error. Also, I tried to have the UI to keep less history, using

cfg.set('spark.ui.retainedJobs', '1')
cfg.set('spark.ui.retainedStages', '1')
cfg.set('spark.ui.retainedTasks', '1')
cfg.set('spark.sql.ui.retainedExecutions', '1')
cfg.set('spark.ui.retainedDeadExecutors', '1')

This also did not help.

UPDATE 2017-05-18

I found out about Spark's pyspark.sql.DataFrame.checkpoint method. This is like persist but gets rid of the dataframe's lineage. Thus it helps to circumvent the above mentioned issues.

Community
  • 1
  • 1
akoeltringer
  • 1,671
  • 3
  • 19
  • 34
  • Is `path/to/parquet/` on a local filesystem or HDFS for example? – ImDarrenG Apr 26 '17 at 15:54
  • it is a path on the local filesystem – akoeltringer Apr 26 '17 at 15:57
  • Briefly, please can you explain what lines 18 to 33 are doing? – ImDarrenG Apr 26 '17 at 16:01
  • Scratch that, I can follow the logic now. – ImDarrenG Apr 26 '17 at 16:11
  • The original dataframe containing the contents of the file is used repeatedly in the loop, so should definitely be cached up front. That will prevent it being recomputed for each iteration of the loop. – Chris May 17 '20 at 14:48
  • "If I get an OOM error, the right place to look at is almost always the driver (b/c the executor spills to disk)?" - No, the nodes can run out of memory during shuffles for example. – Chris May 17 '20 at 14:49
  • "And why do I need 10G spark.driver.memory when I am transferring only a few kB from the executors to the driver?" - Could have been related to https://issues.apache.org/jira/browse/SPARK-12837. Fixed in v2.2.0 – Chris May 17 '20 at 14:52

0 Answers0