6

Running into an error I think being caused by the Window Function.

When I apply this script and persist just a few sample rows it works fine however when I apply it to my whole dataset (only a few GB) it fails with this bizarre error on the last step when trying to persist to hdfs ... the script works when I persist w/o the Window Function so the problem must be from that (I have around 325 feature columns running through the for loop).

Any idea what could be causing the problem? My goal is to just impute time series data via forward fill method on every variable in my dataframe.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window
import sys
print(spark.version)
'2.3.0'

# sample data
df = spark.createDataFrame([('2019-05-10 7:30:05', '1', '10', '0.5', 'FALSE'),\
                            ('2019-05-10 7:30:10', '2', 'UNKNOWN', '0.24', 'FALSE'),\
                            ('2019-05-10 7:30:15', '3', '6', 'UNKNOWN', 'TRUE'),\
                            ('2019-05-10 7:30:20', '4', '7', 'UNKNOWN', 'UNKNOWN'),\
                            ('2019-05-10 7:30:25', '5', '10', '1.1', 'UNKNOWN'),\
                            ('2019-05-10 7:30:30', '6', 'UNKNOWN', '1.1', 'NULL'),\
                            ('2019-05-10 7:30:35', '7', 'UNKNOWN', 'UNKNOWN', 'TRUE'),\
                            ('2019-05-10 7:30:49', '8', '50', 'UNKNOWN', 'UNKNOWN')], ["date", "id", "v1", "v2", "v3"])

df = df.withColumn("date", F.col("date").cast("timestamp"))

# imputer process / all cols that need filled are strings
def stringReplacer(x, y):
    return F.when(x != y, x).otherwise(F.lit(None)) # replace with NULL

def forwardFillImputer(df, cols=[], partitioner="date", value="UNKNOWN"):
  for i in cols:
    window = Window\
    .partitionBy(F.month(partitioner))\
    .orderBy(partitioner)\
    .rowsBetween(-sys.maxsize, 0)

    df = df\
    .withColumn(i, stringReplacer(F.col(i), value))
    fill = F.last(df[i], ignorenulls=True).over(window)
    df = df.withColumn(i,  fill)
  return df
df2 = forwardFillImputer(df, cols=[i for i in df.columns])

# errors here
df2\
.write\
.format("csv")\
.mode("overwrite")\
.option("header", "true")\
.save("test_window_func.csv")
Py4JJavaError: An error occurred while calling o13504.save.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.StackOverflowError
    at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:200)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:200)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:200)
    at scala.collection.immutable.List.foreach(List.scala:381)

possible working solution

def forwardFillImputer(df, cols=[], partitioner="date", value="UNKNOWN"):
    window = Window \
     .partitionBy(F.month(partitioner)) \
     .orderBy(partitioner) \
     .rowsBetween(-sys.maxsize, 0)
    imputed_cols = [F.last(stringReplacer(F.col(i), value), ignorenulls=True).over(window).alias(i) 
                    for i in cols]
    missing_cols = [i for i in df.columns if i not in cols]
    return df.select(missing_cols+imputed_cols)

df2 = forwardFillImputer(df, cols=[i for i in df.columns[1:]])

df2.printSchema()
root
 |-- date: timestamp (nullable = true)
 |-- id: string (nullable = true)
 |-- v1: string (nullable = true)
 |-- v2: string (nullable = true)
 |-- v3: string (nullable = true)

df2.show()
+-------------------+---+---+----+-----+
|               date| id| v1|  v2|   v3|
+-------------------+---+---+----+-----+
|2019-05-10 07:30:05|  1| 10| 0.5|FALSE|
|2019-05-10 07:30:10|  2| 10|0.24|FALSE|
|2019-05-10 07:30:15|  3|  6|0.24| TRUE|
|2019-05-10 07:30:20|  4|  7|0.24| TRUE|
|2019-05-10 07:30:25|  5| 10| 1.1| TRUE|
|2019-05-10 07:30:30|  6| 10| 1.1| NULL|
|2019-05-10 07:30:35|  7| 10| 1.1| TRUE|
|2019-05-10 07:30:49|  8| 50| 1.1| TRUE|
+-------------------+---+---+----+-----+
thePurplePython
  • 2,621
  • 1
  • 13
  • 34

1 Answers1

5

By the stacktrace provided I believe the error comes from preparation of the execution plan, as it says:

Caused by: java.lang.StackOverflowError
    at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:200)

I believe that the reason for that is because you call the method .withColumn twice in the loop. What .withColumn does in the Spark execution plan is basically a select statement of all columns with 1 column changed as specified in the method. If you have 325 columns, then for single iteration this will call select on 325 columns twice -> 650 columns passed into the planner. Doing this 325 times you can see how it can create an overhead.

However it is very interesting though that you do not receive this error for a small sample, I'd expect otherwise.

Anyway you can try replacing your forwardFillImputer like this:

def forwardFillImputer(df, cols=[], partitioner="date", value="UNKNOWN"):
    window = Window \
     .partitionBy(F.month(partitioner)) \
     .orderBy(partitioner) \
     .rowsBetween(-sys.maxsize, 0)

    imputed_cols = [F.last(stringReplacer(F.col(i), value), ignorenulls=True).over(window).alias(i) 
                    for i in cols]

    missing_cols = [F.col(i) for i in df.columns if i not in cols]

    return df.select(missing_cols + imputed_cols)

This way you basically just parse into planner a single select statement, which should be easier to handle.

Just as a warning, generally Spark doesn't do well with high number of columns, so you might see other strange issues along the way.

Richard Nemeth
  • 1,784
  • 1
  • 6
  • 16
  • thank you for the tips! will try this tomorrow and let you know what happens ... any documentation you are aware of explaining why spark doesn't do well w many cols? – thePurplePython May 29 '19 at 22:34
  • Unfortunately no documentation, sorry :( It is just my experience when working with dataframes with 1000 and more columns. However I believe it's because Spark doesn't scale well in regards to number of columns, Spark mainly scales on Rows and Spark partitions dataframes on Rows, not on columns. But each Row contains all the columns selected, hence the performance degrades. Also if you have a lot of columns, you might hit the JVM limit as mentioned here https://stackoverflow.com/questions/44557739/maximum-number-of-columns-we-can-have-in-dataframe-spark-scala – Richard Nemeth May 29 '19 at 22:46
  • 1
    You are right, alias is missing. I've just updated the code, is it still returning that nonsense? – Richard Nemeth May 29 '19 at 22:54
  • Thanks! I see alias working well now ... will test this on the big dataset tomorrow. What is the purpose of ```missing_cols```? Ideally I don't want to impute on my partitioned cols so for output I changed to this ```df2 = forwardFillImputer(df, cols=[i for i in df.columns[1:]])``` – thePurplePython May 29 '19 at 23:00
  • added possible solution to question – thePurplePython May 29 '19 at 23:02
  • Well missing_cols are all the columns which were not imputed, i.e. columns present in the dataframe but not present in the `cols` list. Because we switched withColumn to select, we need to select all columns, so that we won't drop columns we want to keep. – Richard Nemeth May 29 '19 at 23:09
  • makes sense ... unfortunately getting the same error on the 300+ cols dataset – thePurplePython May 30 '19 at 13:32
  • what is your `spark.driver.memory` configuration? – Richard Nemeth May 30 '19 at 15:09
  • standard config ... not pulling anything back to driver though ... I was able to do the imputing on my skinny table and then pivot into the wide table. – thePurplePython May 31 '19 at 00:36
  • so 1g, try boosting it up anyway, for example to `10g`. Sometimes a lot of more hidden things happen on the driver than one expects. It is a long shot though. – Richard Nemeth May 31 '19 at 07:07
  • unfortunately this didn't make a difference ... wish the ```Caused by: java.lang.StackOverflowError at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:200)``` error was more straight-forward to identify the actual root cause – thePurplePython Jun 05 '19 at 03:39