I am processing parquet files in pyspark. My version info is:
My data contains date and timestamp fields with values less than '1970-01-01'. I am getting the following error running locally on Mac OS Monterey v12.6.1.
22/11/27 20:22:46 ERROR Utils: Aborting task
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/pm/opt/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
process()
File "/Users/pm/opt/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
serializer.dump_stream(out_iter, outfile)
File "/Users/pm/opt/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/Users/pm/opt/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
return f(*args, **kwargs)
File "/Users/pm/opt/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/sql/types.py", line 788, in toInternal
return tuple(
File "/Users/pm/opt/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/sql/types.py", line 789, in <genexpr>
f.toInternal(v) if c else v
File "/Users/pm/opt/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/sql/types.py", line 591, in toInternal
return self.dataType.toInternal(obj)
File "/Users/pm/opt/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/sql/types.py", line 216, in toInternal
calendar.timegm(dt.utctimetuple()) if dt.tzinfo else time.mktime(dt.timetuple())
OverflowError: mktime argument out of range
How can I resolve this error?
Adding more context:
I am reading data from a file, processing it and writing it to Postgres table. Reading the parquet file, writing it to the table works fine. I also tried writing the dataframe as a parquet file as it. That also worked.
I was able to isolate the issue to the part where I capture records that failed table insert and write them to a reject file. This is done by calling a mapPartition on the dataframe from the parquet file. The core logic of the map partitions function is:
for row in rows:
try:
processed_row_count.add(1) # accumulator
cur.execute(insert_statement, row)
accepted_row_count.add(1) # accumulator
except Exception as e:
# Collect the records with error message here and write to reject file
rejected_row_count.add(1) # accumulator
row = row.asDict()
row["ErrorMessage"] = f"Error received from psycopg2 module is: {str(e)}"
yield Row(**row)
conn.close()
The mapPartitions is called as:
rejected_on_load_df = enriched_df.rdd.mapPartitions(process_rows).toDF(
enriched_df.schema.add("ErrorMessage", StringType(), nullable=False)
)
I get the error on writing the rejected_on_load_df